diff options
| author | Keith Wall <kwall@apache.org> | 2014-03-12 11:28:49 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-03-12 11:28:49 +0000 |
| commit | a038ecc0c2da1fdd8cff38051bba3c8ffca4df1e (patch) | |
| tree | ac67abd1b2d49ebaab052a792509a3f21be77ea3 /qpid/java/bdbstore/src/test | |
| parent | b5d82ab769e4368c88fbbe6503b571303a5dcd14 (diff) | |
| download | qpid-python-a038ecc0c2da1fdd8cff38051bba3c8ffca4df1e.tar.gz | |
QPID-5410: [Java Broker/BDB]. Introduce a thin facade (EnvironmentFacade) between the BDBMessage and BDB JE's Environment/ReplicatedEnvironment. The motivation behind this facade is principally HA; there are a number of cases where JE requires the ReplicatedEnvironment is recreated. The facade layer allows for this to be done transparently from the upper tiers (the BDBMessageStore). The facade has two implementations StandardFacade used in the non-HA use case, and ReplicatedEnvironmentFacade in the HA case.
Key changes:
* BDBHAVirtualHost is now responsible for the creation of ReplicatedEnvironmentFacade
* BDBMessageStore reverts to a single implementation without knowledge of HA.
* BDBMessageStore now interacts with JE via the facade.
* BDBHAMessageStoreManagerMBean interrogates the facade
* ReplicatedEnvironmentFacade monitors the group for changes in state (nodes becoming uncontactable etc), if such a state change is detected, the DatabasePinger
fires a single transaction to determine if quorum still exists. If quorum does not exist, the environment is restarted, thus transition the environment into
the UNKNOWN state.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha2@1576697 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/test')
8 files changed, 748 insertions, 227 deletions
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java deleted file mode 100644 index c2b3aeab3e..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; -import java.net.InetAddress; - -import java.util.HashMap; -import java.util.Map; -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.util.FileUtils; - -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.rep.ReplicatedEnvironment; -import com.sleepycat.je.rep.ReplicationConfig; - -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class BDBHAMessageStoreTest extends QpidTestCase -{ - private static final String TEST_LOG_FILE_MAX = "1000000"; - private static final String TEST_ELECTION_RETRIES = "1000"; - private static final String TEST_NUMBER_OF_THREADS = "10"; - private static final String TEST_ENV_CONSISTENCY_TIMEOUT = "9999999"; - private String _groupName; - private String _workDir; - private int _masterPort; - private String _host; - private XMLConfiguration _configXml; - private VirtualHost _virtualHost; - private org.apache.qpid.server.model.VirtualHost _modelVhost; - - public void setUp() throws Exception - { - super.setUp(); - - _workDir = TMP_FOLDER + File.separator + getName(); - _host = InetAddress.getByName("localhost").getHostAddress(); - _groupName = "group" + getName(); - _masterPort = -1; - - FileUtils.delete(new File(_workDir), true); - _configXml = new XMLConfiguration(); - _modelVhost = mock(org.apache.qpid.server.model.VirtualHost.class); - - - BrokerTestHelper.setUp(); - } - - public void tearDown() throws Exception - { - try - { - if (_virtualHost != null) - { - _virtualHost.close(); - } - FileUtils.delete(new File(_workDir), true); - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - public void testSetSystemConfiguration() throws Exception - { - // create virtual host configuration, registry and host instance - addVirtualHostConfiguration(); - String vhostName = "test" + _masterPort; - VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, _configXml.subset("virtualhosts.virtualhost." + vhostName), BrokerTestHelper.createBrokerMock()); - - _virtualHost = BrokerTestHelper.createVirtualHost(configuration,new VirtualHostRegistry(new EventLogger()),_modelVhost); - BDBHAMessageStore store = (BDBHAMessageStore) _virtualHost.getMessageStore(); - - // test whether JVM system settings were applied - Environment env = store.getEnvironment(); - assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS)); - assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX)); - - ReplicatedEnvironment repEnv = store.getReplicatedEnvironment(); - assertEquals("Unexpected number of elections primary retries", TEST_ELECTION_RETRIES, - repEnv.getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES)); - assertEquals("Unexpected number of elections primary retries", TEST_ENV_CONSISTENCY_TIMEOUT, - repEnv.getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT)); - } - - private void addVirtualHostConfiguration() throws Exception - { - int port = findFreePort(); - if (_masterPort == -1) - { - _masterPort = port; - } - String nodeName = getNodeNameForNodeAt(port); - - String vhostName = "test" + port; - String vhostPrefix = "virtualhosts.virtualhost." + vhostName; - - _configXml.addProperty("virtualhosts.virtualhost.name", vhostName); - _configXml.addProperty(vhostPrefix + ".type", BDBHAVirtualHostFactory.TYPE); - - when(_modelVhost.getAttribute(eq(_modelVhost.STORE_PATH))).thenReturn(_workDir + File.separator - + port); - when(_modelVhost.getAttribute(eq("haGroupName"))).thenReturn(_groupName); - when(_modelVhost.getAttribute(eq("haNodeName"))).thenReturn(nodeName); - when(_modelVhost.getAttribute(eq("haNodeAddress"))).thenReturn(getNodeHostPortForNodeAt(port)); - when(_modelVhost.getAttribute(eq("haHelperAddress"))).thenReturn(getHelperHostPort()); - - Map<String,String> bdbEnvConfig = new HashMap<String,String>(); - bdbEnvConfig.put(EnvironmentConfig.CLEANER_THREADS, TEST_NUMBER_OF_THREADS); - bdbEnvConfig.put(EnvironmentConfig.LOG_FILE_MAX, TEST_LOG_FILE_MAX); - - when(_modelVhost.getAttribute(eq("bdbEnvironmentConfig"))).thenReturn(bdbEnvConfig); - - Map<String,String> repConfig = new HashMap<String,String>(); - repConfig.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, TEST_ELECTION_RETRIES); - repConfig.put(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT, TEST_ENV_CONSISTENCY_TIMEOUT); - when(_modelVhost.getAttribute(eq("haReplicationConfig"))).thenReturn(repConfig); - - } - - private String getNodeNameForNodeAt(final int bdbPort) - { - return "node" + getName() + bdbPort; - } - - private String getNodeHostPortForNodeAt(final int bdbPort) - { - return _host + ":" + bdbPort; - } - - private String getHelperHostPort() - { - if (_masterPort == -1) - { - throw new IllegalStateException("Helper port not yet assigned."); - } - return _host + ":" + _masterPort; - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java deleted file mode 100644 index 7f7b65f315..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.test.utils.QpidTestCase; - -import static org.mockito.Mockito.mock; - -public class HAMessageStoreSmokeTest extends QpidTestCase -{ - private final BDBHAMessageStore _store = new BDBHAMessageStore(); - - public void testMissingHAConfigThrowsException() throws Exception - { - try - { - _store.configure(mock(VirtualHost.class)); - fail("Expected an exception to be thrown"); - } - catch (ServerScopedRuntimeException ce) - { - assertTrue(ce.getMessage().contains("BDB HA configuration key not found")); - } - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java index 730001d849..385681446a 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java @@ -22,20 +22,14 @@ package org.apache.qpid.server.store.berkeleydb; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreCreator; -import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; import org.apache.qpid.test.utils.QpidTestCase; public class MessageStoreCreatorTest extends QpidTestCase { - private static final String[] STORE_TYPES = {BDBMessageStore.TYPE}; - public void testMessageStoreCreator() { MessageStoreCreator messageStoreCreator = new MessageStoreCreator(); - for (String type : STORE_TYPES) - { - MessageStore store = messageStoreCreator.createMessageStore(type); - assertNotNull("Store of type " + type + " is not created", store); - } - } -} + String type = new BDBMessageStoreFactory().getType(); + MessageStore store = messageStoreCreator.createMessageStore(type); + assertNotNull("Store of type " + type + " is not created", store); + }} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java new file mode 100644 index 0000000000..b19e18b204 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java @@ -0,0 +1,128 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.util.Collections; + +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.util.FileUtils; + +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.Environment; + +public class StandardEnvironmentFacadeTest extends QpidTestCase +{ + protected File _storePath; + protected EnvironmentFacade _environmentFacade; + + protected void setUp() throws Exception + { + super.setUp(); + _storePath = new File(TMP_FOLDER + File.separator + "bdb" + File.separator + getTestName()); + } + + protected void tearDown() throws Exception + { + try + { + super.tearDown(); + if (_environmentFacade != null) + { + _environmentFacade.close(); + } + } + finally + { + if (_storePath != null) + { + FileUtils.delete(_storePath, true); + } + } + } + + public void testEnvironmentFacade() throws Exception + { + EnvironmentFacade ef = getEnvironmentFacade(); + assertNotNull("Environment should not be null", ef); + Environment e = ef.getEnvironment(); + assertTrue("Environment is not valid", e.isValid()); + } + + public void testClose() throws Exception + { + EnvironmentFacade ef = getEnvironmentFacade(); + ef.close(); + Environment e = ef.getEnvironment(); + + assertNull("Environment should be null after facade close", e); + } + + public void testOpenDatabases() throws Exception + { + EnvironmentFacade ef = getEnvironmentFacade(); + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + ef.openDatabases(dbConfig, "test1", "test2"); + Database test1 = ef.getOpenDatabase("test1"); + Database test2 = ef.getOpenDatabase("test2"); + + assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); + assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName()); + } + + public void testGetOpenDatabaseForNonExistingDatabase() throws Exception + { + EnvironmentFacade ef = getEnvironmentFacade(); + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + ef.openDatabases(dbConfig, "test1"); + Database test1 = ef.getOpenDatabase("test1"); + assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); + try + { + ef.getOpenDatabase("test2"); + fail("An exception should be thrown for the non existing database"); + } + catch(IllegalArgumentException e) + { + assertEquals("Unexpected exception message", "Database with name 'test2' has not been opened", e.getMessage()); + } + } + + EnvironmentFacade getEnvironmentFacade() throws Exception + { + if (_environmentFacade == null) + { + _environmentFacade = createEnvironmentFacade(); + } + return _environmentFacade; + } + + EnvironmentFacade createEnvironmentFacade() + { + return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap()); + } + +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java new file mode 100644 index 0000000000..a05a30b459 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java @@ -0,0 +1,208 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.qpid.server.configuration.ConfigurationEntry; +import org.apache.qpid.server.configuration.ConfigurationEntryStore; +import org.apache.qpid.server.configuration.RecovererProvider; +import org.apache.qpid.server.configuration.startup.VirtualHostRecoverer; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.test.utils.TestFileUtils; +import org.apache.qpid.util.FileUtils; + +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationConfig; + +public class VirtualHostTest extends QpidTestCase +{ + + private Broker _broker; + private StatisticsGatherer _statisticsGatherer; + private RecovererProvider _recovererProvider; + private File _configFile; + private File _bdbStorePath; + private VirtualHost _host; + private ConfigurationEntryStore _store; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + _store = mock(ConfigurationEntryStore.class); + _broker = BrokerTestHelper.createBrokerMock(); + TaskExecutor taslExecutor = mock(TaskExecutor.class); + when(taslExecutor.isTaskExecutorThread()).thenReturn(true); + when(_broker.getTaskExecutor()).thenReturn(taslExecutor); + + + _statisticsGatherer = mock(StatisticsGatherer.class); + + _bdbStorePath = new File(TMP_FOLDER, getTestName() + "." + System.currentTimeMillis()); + _bdbStorePath.deleteOnExit(); + } + + @Override + protected void tearDown() throws Exception + { + try + { + if (_host != null) + { + _host.setDesiredState(_host.getState(), State.STOPPED); + } + } + finally + { + if (_configFile != null) + { + _configFile.delete(); + } + if (_bdbStorePath != null) + { + FileUtils.delete(_bdbStorePath, true); + } + super.tearDown(); + } + } + + + public void testCreateBdbVirtualHostFromConfigurationFile() + { + String hostName = getName(); + long logFileMax = 2000000; + _host = createHostFromConfiguration(hostName, logFileMax); + _host.setDesiredState(State.INITIALISING, State.ACTIVE); + assertEquals("Unexpected host name", hostName, _host.getName()); + assertEquals("Unexpected host type", StandardVirtualHostFactory.TYPE, _host.getType()); + assertEquals("Unexpected store type", new BDBMessageStoreFactory().getType(), _host.getAttribute(VirtualHost.STORE_TYPE)); + assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH)); + + BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore(); + EnvironmentConfig envConfig = messageStore.getEnvironmentFacade().getEnvironment().getConfig(); + assertEquals("Unexpected JE log file max", String.valueOf(logFileMax), envConfig.getConfigParam(EnvironmentConfig.LOG_FILE_MAX)); + + } + + public void testCreateBdbHaVirtualHostFromConfigurationFile() + { + String hostName = getName(); + + String repStreamTimeout = "2 h"; + String nodeName = "node"; + String groupName = "group"; + String nodeHostPort = "localhost:" + findFreePort(); + String helperHostPort = nodeHostPort; + String durability = "NO_SYNC,SYNC,NONE"; + _host = createHaHostFromConfiguration(hostName, groupName, nodeName, nodeHostPort, helperHostPort, durability, repStreamTimeout); + _host.setDesiredState(State.INITIALISING, State.ACTIVE); + assertEquals("Unexpected host name", hostName, _host.getName()); + assertEquals("Unexpected host type", BDBHAVirtualHostFactory.TYPE, _host.getType()); + assertEquals("Unexpected store type", ReplicatedEnvironmentFacade.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE)); + assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH)); + + BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore(); + ReplicatedEnvironment environment = (ReplicatedEnvironment) messageStore.getEnvironmentFacade().getEnvironment(); + ReplicationConfig repConfig = environment.getRepConfig(); + assertEquals("Unexpected JE replication groupName", groupName, repConfig.getConfigParam(ReplicationConfig.GROUP_NAME)); + assertEquals("Unexpected JE replication nodeName", nodeName, repConfig.getConfigParam(ReplicationConfig.NODE_NAME)); + assertEquals("Unexpected JE replication nodeHostPort", nodeHostPort, repConfig.getConfigParam(ReplicationConfig.NODE_HOST_PORT)); + assertEquals("Unexpected JE replication nodeHostPort", helperHostPort, repConfig.getConfigParam(ReplicationConfig.HELPER_HOSTS)); + assertEquals("Unexpected JE replication nodeHostPort", "false", repConfig.getConfigParam(ReplicationConfig.DESIGNATED_PRIMARY)); + assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, repConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT)); + } + + private VirtualHost createHost(Map<String, Object> attributes, Set<UUID> children) + { + ConfigurationEntry entry = new ConfigurationEntry(UUID.randomUUID(), VirtualHost.class.getSimpleName(), attributes, + children, _store); + + return new VirtualHostRecoverer(_statisticsGatherer).create(_recovererProvider, entry, _broker); + } + + private VirtualHost createHost(Map<String, Object> attributes) + { + return createHost(attributes, Collections.<UUID> emptySet()); + } + + private VirtualHost createHostFromConfiguration(String hostName, long logFileMax) + { + String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">" + + "<store><class>" + BDBMessageStore.class.getName() + "</class>" + + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>" + + "<envConfig><name>" + EnvironmentConfig.LOG_FILE_MAX + "</name><value>" + logFileMax + "</value></envConfig>" + + "</store>" + + "</" + hostName + "></virtualhost></virtualhosts>"; + Map<String, Object> attributes = writeConfigAndGenerateAttributes(content); + return createHost(attributes); + } + + + private VirtualHost createHaHostFromConfiguration(String hostName, String groupName, String nodeName, String nodeHostPort, String helperHostPort, String durability, String repStreamTimeout) + { + String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">" + + "<type>" + BDBHAVirtualHostFactory.TYPE + "</type>" + + "<store><class>" + BDBMessageStore.class.getName() + "</class>" + + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>" + + "<highAvailability>" + + "<groupName>" + groupName + "</groupName>" + + "<nodeName>" + nodeName + "</nodeName>" + + "<nodeHostPort>" + nodeHostPort + "</nodeHostPort>" + + "<helperHostPort>" + helperHostPort + "</helperHostPort>" + + "<durability>" + durability.replaceAll(",", "\\\\,") + "</durability>" + + "</highAvailability>" + + "<repConfig><name>" + ReplicationConfig.REP_STREAM_TIMEOUT + "</name><value>" + repStreamTimeout + "</value></repConfig>" + + "</store>" + + "</" + hostName + "></virtualhost></virtualhosts>"; + Map<String, Object> attributes = writeConfigAndGenerateAttributes(content); + return createHost(attributes); + } + + private Map<String, Object> writeConfigAndGenerateAttributes(String content) + { + _configFile = TestFileUtils.createTempFile(this, ".virtualhost.xml", content); + Map<String, Object> attributes = new HashMap<String, Object>(); + attributes.put(VirtualHost.NAME, getName()); + attributes.put(VirtualHost.CONFIG_PATH, _configFile.getAbsolutePath()); + return attributes; + } +} + +
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java new file mode 100644 index 0000000000..cd7dd69c46 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -0,0 +1,336 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.test.utils.TestFileUtils; +import org.apache.qpid.util.FileUtils; + +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.Durability; +import com.sleepycat.je.Environment; +import com.sleepycat.je.rep.ReplicatedEnvironment.State; +import com.sleepycat.je.rep.ReplicationConfig; +import com.sleepycat.je.rep.StateChangeEvent; +import com.sleepycat.je.rep.StateChangeListener; + +public class ReplicatedEnvironmentFacadeTest extends QpidTestCase +{ + + private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort(); + private static final int LISTENER_TIMEOUT = 5; + private static final int WAIT_STATE_CHANGE_TIMEOUT = 30; + private static final String TEST_GROUP_NAME = "testGroupName"; + private static final String TEST_NODE_NAME = "testNodeName"; + private static final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT; + private static final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT; + private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString(); + private static final boolean TEST_DESIGNATED_PRIMARY = false; + private static final boolean TEST_COALESCING_SYNC = true; + private static final int TEST_PRIORITY = 1; + private static final int TEST_ELECTABLE_GROUP_OVERRIDE = 0; + + private File _storePath; + private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>(); + private VirtualHost _virtualHost = mock(VirtualHost.class); + + public void setUp() throws Exception + { + super.setUp(); + + TaskExecutor taskExecutor = mock(TaskExecutor.class); + when(taskExecutor.isTaskExecutorThread()).thenReturn(true); + when(_virtualHost.getTaskExecutor()).thenReturn(taskExecutor); + + _storePath = TestFileUtils.createTestDirectory("bdb", true); + + setTestSystemProperty(ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, "100"); + } + + @Override + public void tearDown() throws Exception + { + try + { + for (EnvironmentFacade ef : _nodes.values()) + { + ef.close(); + } + } + finally + { + try + { + if (_storePath != null) + { + FileUtils.delete(_storePath, true); + } + } + finally + { + super.tearDown(); + } + } + } + public void testEnvironmentFacade() throws Exception + { + EnvironmentFacade ef = createMaster(); + assertNotNull("Environment should not be null", ef); + Environment e = ef.getEnvironment(); + assertTrue("Environment is not valid", e.isValid()); + } + + public void testClose() throws Exception + { + EnvironmentFacade ef = createMaster(); + ef.close(); + Environment e = ef.getEnvironment(); + + assertNull("Environment should be null after facade close", e); + } + + public void testOpenDatabases() throws Exception + { + EnvironmentFacade ef = createMaster(); + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + ef.openDatabases(dbConfig, "test1", "test2"); + Database test1 = ef.getOpenDatabase("test1"); + Database test2 = ef.getOpenDatabase("test2"); + + assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); + assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName()); + } + + public void testGetOpenDatabaseForNonExistingDatabase() throws Exception + { + EnvironmentFacade ef = createMaster(); + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + ef.openDatabases(dbConfig, "test1"); + Database test1 = ef.getOpenDatabase("test1"); + assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); + try + { + ef.getOpenDatabase("test2"); + fail("An exception should be thrown for the non existing database"); + } + catch(IllegalArgumentException e) + { + assertEquals("Unexpected exception message", "Database with name 'test2' has never been requested to be opened", e.getMessage()); + } + } + + public void testGetGroupName() throws Exception + { + assertEquals("Unexpected group name", TEST_GROUP_NAME, createMaster().getGroupName()); + } + + public void testGetNodeName() throws Exception + { + assertEquals("Unexpected group name", TEST_NODE_NAME, createMaster().getNodeName()); + } + + public void testLastKnownReplicationTransactionId() throws Exception + { + ReplicatedEnvironmentFacade master = createMaster(); + long lastKnownReplicationTransactionId = master.getLastKnownReplicationTransactionId(); + assertTrue("Unexpected LastKnownReplicationTransactionId " + lastKnownReplicationTransactionId, lastKnownReplicationTransactionId > 0); + } + + public void testGetNodeHostPort() throws Exception + { + assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, createMaster().getHostPort()); + } + + public void testGetHelperHostPort() throws Exception + { + assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, createMaster().getHelperHostPort()); + } + + public void testGetDurability() throws Exception + { + assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability()); + } + + public void testIsCoalescingSync() throws Exception + { + assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, createMaster().isCoalescingSync()); + } + + public void testGetNodeState() throws Exception + { + assertEquals("Unexpected state", State.MASTER.name(), createMaster().getNodeState()); + } + + + public void testPriority() throws Exception + { + ReplicatedEnvironmentFacade facade = createMaster(); + assertEquals("Unexpected priority", TEST_PRIORITY, facade.getPriority()); + Future<Void> future = facade.setPriority(TEST_PRIORITY + 1); + future.get(5, TimeUnit.SECONDS); + assertEquals("Unexpected priority after change", TEST_PRIORITY + 1, facade.getPriority()); + } + + public void testDesignatedPrimary() throws Exception + { + ReplicatedEnvironmentFacade master = createMaster(); + assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary()); + Future<Void> future = master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY); + future.get(5, TimeUnit.SECONDS); + assertEquals("Unexpected designated primary after change", !TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary()); + } + + public void testElectableGroupSizeOverride() throws Exception + { + ReplicatedEnvironmentFacade facade = createMaster(); + assertEquals("Unexpected Electable Group Size Override", TEST_ELECTABLE_GROUP_OVERRIDE, facade.getElectableGroupSizeOverride()); + Future<Void> future = facade.setElectableGroupSizeOverride(TEST_ELECTABLE_GROUP_OVERRIDE + 1); + future.get(5, TimeUnit.SECONDS); + assertEquals("Unexpected Electable Group Size Override after change", TEST_ELECTABLE_GROUP_OVERRIDE + 1, facade.getElectableGroupSizeOverride()); + } + + public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception + { + final CountDownLatch masterLatch = new CountDownLatch(1); + final AtomicInteger masterStateChangeCount = new AtomicInteger(); + final CountDownLatch unknownLatch = new CountDownLatch(1); + final AtomicInteger unknownStateChangeCount = new AtomicInteger(); + StateChangeListener stateChangeListener = new StateChangeListener() + { + @Override + public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException + { + if (stateChangeEvent.getState() == State.MASTER) + { + masterStateChangeCount.incrementAndGet(); + masterLatch.countDown(); + } + else if (stateChangeEvent.getState() == State.UNKNOWN) + { + unknownStateChangeCount.incrementAndGet(); + unknownLatch.countDown(); + } + } + }; + + addNode(State.MASTER, stateChangeListener); + assertTrue("Master was not started", masterLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); + String node1NodeHostPort = "localhost:" + replica1Port; + int replica2Port = getNextAvailable(replica1Port + 1); + String node2NodeHostPort = "localhost:" + replica2Port; + + ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort); + ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort); + + // close replicas + replica1.close(); + replica2.close(); + + assertTrue("Environment should be recreated and go into unknown state", + unknownLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS)); + + assertEquals("Node made master an unexpected number of times", 1, masterStateChangeCount.get()); + assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get()); + } + + public void testCloseStateTransitions() throws Exception + { + ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster(); + + assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState()); + replicatedEnvironmentFacade.close(); + assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState()); + } + + private ReplicatedEnvironmentFacade createMaster() throws Exception + { + TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); + ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener); + assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + return env; + } + + private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort) throws Exception + { + TestStateChangeListener testStateChangeListener = new TestStateChangeListener(State.REPLICA); + ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener); + boolean awaitForStateChange = testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS); + assertTrue("Replica " + nodeName + " did not go into desired state; current actual state is " + testStateChangeListener.getCurrentActualState(), awaitForStateChange); + return replicaEnvironmentFacade; + } + + private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary, + State desiredState, StateChangeListener stateChangeListener) + { + ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary); + ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config); + ref.setStateChangeListener(stateChangeListener); + _nodes.put(nodeName, ref); + return ref; + } + + private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener) + { + return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener); + } + + private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary) + { + ReplicatedEnvironmentConfiguration node = mock(ReplicatedEnvironmentConfiguration.class); + when(node.getName()).thenReturn(nodeName); + when(node.getHostPort()).thenReturn(nodeHostPort); + when(node.isDesignatedPrimary()).thenReturn(designatedPrimary); + when(node.getQuorumOverride()).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE); + when(node.getPriority()).thenReturn(TEST_PRIORITY); + when(node.getGroupName()).thenReturn(TEST_GROUP_NAME); + when(node.getHelperHostPort()).thenReturn(TEST_NODE_HELPER_HOST_PORT); + when(node.getDurability()).thenReturn(TEST_DURABILITY); + when(node.isCoalescingSync()).thenReturn(TEST_COALESCING_SYNC); + + Map<String, String> repConfig = new HashMap<String, String>(); + repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s"); + repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s"); + when(node.getReplicationParameters()).thenReturn(repConfig); + when(node.getStorePath()).thenReturn(new File(_storePath, nodeName).getAbsolutePath()); + return node; + } +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java new file mode 100644 index 0000000000..0870191b35 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import com.sleepycat.je.rep.ReplicatedEnvironment.State; +import com.sleepycat.je.rep.StateChangeEvent; +import com.sleepycat.je.rep.StateChangeListener; + +class TestStateChangeListener implements StateChangeListener +{ + private final Set<State> _expectedStates; + private final CountDownLatch _latch; + private final AtomicReference<State> _currentActualState = new AtomicReference<State>(); + + public TestStateChangeListener(State expectedState) + { + this(Collections.singleton(expectedState)); + } + + public TestStateChangeListener(Set<State> expectedStates) + { + _expectedStates = new HashSet<State>(expectedStates); + _latch = new CountDownLatch(1); + } + + @Override + public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException + { + _currentActualState.set(stateChangeEvent.getState()); + if (_expectedStates.contains(stateChangeEvent.getState())) + { + _latch.countDown(); + } + } + + public boolean awaitForStateChange(long timeout, TimeUnit timeUnit) throws InterruptedException + { + return _latch.await(timeout, timeUnit); + } + + public State getCurrentActualState() + { + return _currentActualState.get(); + } +}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java index 400ac12792..810f4a1fca 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java @@ -26,7 +26,7 @@ import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.OperationStatus; -import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; import org.apache.qpid.server.util.ServerScopedRuntimeException; public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase @@ -94,7 +94,7 @@ public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase catch(ServerScopedRuntimeException ex) { assertEquals("Incorrect exception thrown", "Database version 999 is higher than the most recent known version: " - + AbstractBDBMessageStore.VERSION, ex.getMessage()); + + BDBMessageStore.VERSION, ex.getMessage()); } } |
