diff options
Diffstat (limited to 'qpid/java/bdbstore/src/test')
8 files changed, 748 insertions, 227 deletions
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java deleted file mode 100644 index c2b3aeab3e..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; -import java.net.InetAddress; - -import java.util.HashMap; -import java.util.Map; -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.util.FileUtils; - -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.rep.ReplicatedEnvironment; -import com.sleepycat.je.rep.ReplicationConfig; - -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class BDBHAMessageStoreTest extends QpidTestCase -{ - private static final String TEST_LOG_FILE_MAX = "1000000"; - private static final String TEST_ELECTION_RETRIES = "1000"; - private static final String TEST_NUMBER_OF_THREADS = "10"; - private static final String TEST_ENV_CONSISTENCY_TIMEOUT = "9999999"; - private String _groupName; - private String _workDir; - private int _masterPort; - private String _host; - private XMLConfiguration _configXml; - private VirtualHost _virtualHost; - private org.apache.qpid.server.model.VirtualHost _modelVhost; - - public void setUp() throws Exception - { - super.setUp(); - - _workDir = TMP_FOLDER + File.separator + getName(); - _host = InetAddress.getByName("localhost").getHostAddress(); - _groupName = "group" + getName(); - _masterPort = -1; - - FileUtils.delete(new File(_workDir), true); - _configXml = new XMLConfiguration(); - _modelVhost = mock(org.apache.qpid.server.model.VirtualHost.class); - - - BrokerTestHelper.setUp(); - } - - public void tearDown() throws Exception - { - try - { - if (_virtualHost != null) - { - _virtualHost.close(); - } - FileUtils.delete(new File(_workDir), true); - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - public void testSetSystemConfiguration() throws Exception - { - // create virtual host configuration, registry and host instance - addVirtualHostConfiguration(); - String vhostName = "test" + _masterPort; - VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, _configXml.subset("virtualhosts.virtualhost." + vhostName), BrokerTestHelper.createBrokerMock()); - - _virtualHost = BrokerTestHelper.createVirtualHost(configuration,new VirtualHostRegistry(new EventLogger()),_modelVhost); - BDBHAMessageStore store = (BDBHAMessageStore) _virtualHost.getMessageStore(); - - // test whether JVM system settings were applied - Environment env = store.getEnvironment(); - assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS)); - assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX)); - - ReplicatedEnvironment repEnv = store.getReplicatedEnvironment(); - assertEquals("Unexpected number of elections primary retries", TEST_ELECTION_RETRIES, - repEnv.getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES)); - assertEquals("Unexpected number of elections primary retries", TEST_ENV_CONSISTENCY_TIMEOUT, - repEnv.getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT)); - } - - private void addVirtualHostConfiguration() throws Exception - { - int port = findFreePort(); - if (_masterPort == -1) - { - _masterPort = port; - } - String nodeName = getNodeNameForNodeAt(port); - - String vhostName = "test" + port; - String vhostPrefix = "virtualhosts.virtualhost." + vhostName; - - _configXml.addProperty("virtualhosts.virtualhost.name", vhostName); - _configXml.addProperty(vhostPrefix + ".type", BDBHAVirtualHostFactory.TYPE); - - when(_modelVhost.getAttribute(eq(_modelVhost.STORE_PATH))).thenReturn(_workDir + File.separator - + port); - when(_modelVhost.getAttribute(eq("haGroupName"))).thenReturn(_groupName); - when(_modelVhost.getAttribute(eq("haNodeName"))).thenReturn(nodeName); - when(_modelVhost.getAttribute(eq("haNodeAddress"))).thenReturn(getNodeHostPortForNodeAt(port)); - when(_modelVhost.getAttribute(eq("haHelperAddress"))).thenReturn(getHelperHostPort()); - - Map<String,String> bdbEnvConfig = new HashMap<String,String>(); - bdbEnvConfig.put(EnvironmentConfig.CLEANER_THREADS, TEST_NUMBER_OF_THREADS); - bdbEnvConfig.put(EnvironmentConfig.LOG_FILE_MAX, TEST_LOG_FILE_MAX); - - when(_modelVhost.getAttribute(eq("bdbEnvironmentConfig"))).thenReturn(bdbEnvConfig); - - Map<String,String> repConfig = new HashMap<String,String>(); - repConfig.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, TEST_ELECTION_RETRIES); - repConfig.put(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT, TEST_ENV_CONSISTENCY_TIMEOUT); - when(_modelVhost.getAttribute(eq("haReplicationConfig"))).thenReturn(repConfig); - - } - - private String getNodeNameForNodeAt(final int bdbPort) - { - return "node" + getName() + bdbPort; - } - - private String getNodeHostPortForNodeAt(final int bdbPort) - { - return _host + ":" + bdbPort; - } - - private String getHelperHostPort() - { - if (_masterPort == -1) - { - throw new IllegalStateException("Helper port not yet assigned."); - } - return _host + ":" + _masterPort; - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java deleted file mode 100644 index 7f7b65f315..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.test.utils.QpidTestCase; - -import static org.mockito.Mockito.mock; - -public class HAMessageStoreSmokeTest extends QpidTestCase -{ - private final BDBHAMessageStore _store = new BDBHAMessageStore(); - - public void testMissingHAConfigThrowsException() throws Exception - { - try - { - _store.configure(mock(VirtualHost.class)); - fail("Expected an exception to be thrown"); - } - catch (ServerScopedRuntimeException ce) - { - assertTrue(ce.getMessage().contains("BDB HA configuration key not found")); - } - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java index 730001d849..385681446a 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java @@ -22,20 +22,14 @@ package org.apache.qpid.server.store.berkeleydb; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreCreator; -import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; import org.apache.qpid.test.utils.QpidTestCase; public class MessageStoreCreatorTest extends QpidTestCase { - private static final String[] STORE_TYPES = {BDBMessageStore.TYPE}; - public void testMessageStoreCreator() { MessageStoreCreator messageStoreCreator = new MessageStoreCreator(); - for (String type : STORE_TYPES) - { - MessageStore store = messageStoreCreator.createMessageStore(type); - assertNotNull("Store of type " + type + " is not created", store); - } - } -} + String type = new BDBMessageStoreFactory().getType(); + MessageStore store = messageStoreCreator.createMessageStore(type); + assertNotNull("Store of type " + type + " is not created", store); + }} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java new file mode 100644 index 0000000000..b19e18b204 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java @@ -0,0 +1,128 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.util.Collections; + +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.util.FileUtils; + +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.Environment; + +public class StandardEnvironmentFacadeTest extends QpidTestCase +{ + protected File _storePath; + protected EnvironmentFacade _environmentFacade; + + protected void setUp() throws Exception + { + super.setUp(); + _storePath = new File(TMP_FOLDER + File.separator + "bdb" + File.separator + getTestName()); + } + + protected void tearDown() throws Exception + { + try + { + super.tearDown(); + if (_environmentFacade != null) + { + _environmentFacade.close(); + } + } + finally + { + if (_storePath != null) + { + FileUtils.delete(_storePath, true); + } + } + } + + public void testEnvironmentFacade() throws Exception + { + EnvironmentFacade ef = getEnvironmentFacade(); + assertNotNull("Environment should not be null", ef); + Environment e = ef.getEnvironment(); + assertTrue("Environment is not valid", e.isValid()); + } + + public void testClose() throws Exception + { + EnvironmentFacade ef = getEnvironmentFacade(); + ef.close(); + Environment e = ef.getEnvironment(); + + assertNull("Environment should be null after facade close", e); + } + + public void testOpenDatabases() throws Exception + { + EnvironmentFacade ef = getEnvironmentFacade(); + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + ef.openDatabases(dbConfig, "test1", "test2"); + Database test1 = ef.getOpenDatabase("test1"); + Database test2 = ef.getOpenDatabase("test2"); + + assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); + assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName()); + } + + public void testGetOpenDatabaseForNonExistingDatabase() throws Exception + { + EnvironmentFacade ef = getEnvironmentFacade(); + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + ef.openDatabases(dbConfig, "test1"); + Database test1 = ef.getOpenDatabase("test1"); + assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); + try + { + ef.getOpenDatabase("test2"); + fail("An exception should be thrown for the non existing database"); + } + catch(IllegalArgumentException e) + { + assertEquals("Unexpected exception message", "Database with name 'test2' has not been opened", e.getMessage()); + } + } + + EnvironmentFacade getEnvironmentFacade() throws Exception + { + if (_environmentFacade == null) + { + _environmentFacade = createEnvironmentFacade(); + } + return _environmentFacade; + } + + EnvironmentFacade createEnvironmentFacade() + { + return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap()); + } + +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java new file mode 100644 index 0000000000..a05a30b459 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java @@ -0,0 +1,208 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.qpid.server.configuration.ConfigurationEntry; +import org.apache.qpid.server.configuration.ConfigurationEntryStore; +import org.apache.qpid.server.configuration.RecovererProvider; +import org.apache.qpid.server.configuration.startup.VirtualHostRecoverer; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.test.utils.TestFileUtils; +import org.apache.qpid.util.FileUtils; + +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationConfig; + +public class VirtualHostTest extends QpidTestCase +{ + + private Broker _broker; + private StatisticsGatherer _statisticsGatherer; + private RecovererProvider _recovererProvider; + private File _configFile; + private File _bdbStorePath; + private VirtualHost _host; + private ConfigurationEntryStore _store; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + _store = mock(ConfigurationEntryStore.class); + _broker = BrokerTestHelper.createBrokerMock(); + TaskExecutor taslExecutor = mock(TaskExecutor.class); + when(taslExecutor.isTaskExecutorThread()).thenReturn(true); + when(_broker.getTaskExecutor()).thenReturn(taslExecutor); + + + _statisticsGatherer = mock(StatisticsGatherer.class); + + _bdbStorePath = new File(TMP_FOLDER, getTestName() + "." + System.currentTimeMillis()); + _bdbStorePath.deleteOnExit(); + } + + @Override + protected void tearDown() throws Exception + { + try + { + if (_host != null) + { + _host.setDesiredState(_host.getState(), State.STOPPED); + } + } + finally + { + if (_configFile != null) + { + _configFile.delete(); + } + if (_bdbStorePath != null) + { + FileUtils.delete(_bdbStorePath, true); + } + super.tearDown(); + } + } + + + public void testCreateBdbVirtualHostFromConfigurationFile() + { + String hostName = getName(); + long logFileMax = 2000000; + _host = createHostFromConfiguration(hostName, logFileMax); + _host.setDesiredState(State.INITIALISING, State.ACTIVE); + assertEquals("Unexpected host name", hostName, _host.getName()); + assertEquals("Unexpected host type", StandardVirtualHostFactory.TYPE, _host.getType()); + assertEquals("Unexpected store type", new BDBMessageStoreFactory().getType(), _host.getAttribute(VirtualHost.STORE_TYPE)); + assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH)); + + BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore(); + EnvironmentConfig envConfig = messageStore.getEnvironmentFacade().getEnvironment().getConfig(); + assertEquals("Unexpected JE log file max", String.valueOf(logFileMax), envConfig.getConfigParam(EnvironmentConfig.LOG_FILE_MAX)); + + } + + public void testCreateBdbHaVirtualHostFromConfigurationFile() + { + String hostName = getName(); + + String repStreamTimeout = "2 h"; + String nodeName = "node"; + String groupName = "group"; + String nodeHostPort = "localhost:" + findFreePort(); + String helperHostPort = nodeHostPort; + String durability = "NO_SYNC,SYNC,NONE"; + _host = createHaHostFromConfiguration(hostName, groupName, nodeName, nodeHostPort, helperHostPort, durability, repStreamTimeout); + _host.setDesiredState(State.INITIALISING, State.ACTIVE); + assertEquals("Unexpected host name", hostName, _host.getName()); + assertEquals("Unexpected host type", BDBHAVirtualHostFactory.TYPE, _host.getType()); + assertEquals("Unexpected store type", ReplicatedEnvironmentFacade.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE)); + assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH)); + + BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore(); + ReplicatedEnvironment environment = (ReplicatedEnvironment) messageStore.getEnvironmentFacade().getEnvironment(); + ReplicationConfig repConfig = environment.getRepConfig(); + assertEquals("Unexpected JE replication groupName", groupName, repConfig.getConfigParam(ReplicationConfig.GROUP_NAME)); + assertEquals("Unexpected JE replication nodeName", nodeName, repConfig.getConfigParam(ReplicationConfig.NODE_NAME)); + assertEquals("Unexpected JE replication nodeHostPort", nodeHostPort, repConfig.getConfigParam(ReplicationConfig.NODE_HOST_PORT)); + assertEquals("Unexpected JE replication nodeHostPort", helperHostPort, repConfig.getConfigParam(ReplicationConfig.HELPER_HOSTS)); + assertEquals("Unexpected JE replication nodeHostPort", "false", repConfig.getConfigParam(ReplicationConfig.DESIGNATED_PRIMARY)); + assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, repConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT)); + } + + private VirtualHost createHost(Map<String, Object> attributes, Set<UUID> children) + { + ConfigurationEntry entry = new ConfigurationEntry(UUID.randomUUID(), VirtualHost.class.getSimpleName(), attributes, + children, _store); + + return new VirtualHostRecoverer(_statisticsGatherer).create(_recovererProvider, entry, _broker); + } + + private VirtualHost createHost(Map<String, Object> attributes) + { + return createHost(attributes, Collections.<UUID> emptySet()); + } + + private VirtualHost createHostFromConfiguration(String hostName, long logFileMax) + { + String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">" + + "<store><class>" + BDBMessageStore.class.getName() + "</class>" + + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>" + + "<envConfig><name>" + EnvironmentConfig.LOG_FILE_MAX + "</name><value>" + logFileMax + "</value></envConfig>" + + "</store>" + + "</" + hostName + "></virtualhost></virtualhosts>"; + Map<String, Object> attributes = writeConfigAndGenerateAttributes(content); + return createHost(attributes); + } + + + private VirtualHost createHaHostFromConfiguration(String hostName, String groupName, String nodeName, String nodeHostPort, String helperHostPort, String durability, String repStreamTimeout) + { + String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">" + + "<type>" + BDBHAVirtualHostFactory.TYPE + "</type>" + + "<store><class>" + BDBMessageStore.class.getName() + "</class>" + + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>" + + "<highAvailability>" + + "<groupName>" + groupName + "</groupName>" + + "<nodeName>" + nodeName + "</nodeName>" + + "<nodeHostPort>" + nodeHostPort + "</nodeHostPort>" + + "<helperHostPort>" + helperHostPort + "</helperHostPort>" + + "<durability>" + durability.replaceAll(",", "\\\\,") + "</durability>" + + "</highAvailability>" + + "<repConfig><name>" + ReplicationConfig.REP_STREAM_TIMEOUT + "</name><value>" + repStreamTimeout + "</value></repConfig>" + + "</store>" + + "</" + hostName + "></virtualhost></virtualhosts>"; + Map<String, Object> attributes = writeConfigAndGenerateAttributes(content); + return createHost(attributes); + } + + private Map<String, Object> writeConfigAndGenerateAttributes(String content) + { + _configFile = TestFileUtils.createTempFile(this, ".virtualhost.xml", content); + Map<String, Object> attributes = new HashMap<String, Object>(); + attributes.put(VirtualHost.NAME, getName()); + attributes.put(VirtualHost.CONFIG_PATH, _configFile.getAbsolutePath()); + return attributes; + } +} + +
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java new file mode 100644 index 0000000000..cd7dd69c46 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -0,0 +1,336 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.test.utils.TestFileUtils; +import org.apache.qpid.util.FileUtils; + +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.Durability; +import com.sleepycat.je.Environment; +import com.sleepycat.je.rep.ReplicatedEnvironment.State; +import com.sleepycat.je.rep.ReplicationConfig; +import com.sleepycat.je.rep.StateChangeEvent; +import com.sleepycat.je.rep.StateChangeListener; + +public class ReplicatedEnvironmentFacadeTest extends QpidTestCase +{ + + private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort(); + private static final int LISTENER_TIMEOUT = 5; + private static final int WAIT_STATE_CHANGE_TIMEOUT = 30; + private static final String TEST_GROUP_NAME = "testGroupName"; + private static final String TEST_NODE_NAME = "testNodeName"; + private static final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT; + private static final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT; + private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString(); + private static final boolean TEST_DESIGNATED_PRIMARY = false; + private static final boolean TEST_COALESCING_SYNC = true; + private static final int TEST_PRIORITY = 1; + private static final int TEST_ELECTABLE_GROUP_OVERRIDE = 0; + + private File _storePath; + private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>(); + private VirtualHost _virtualHost = mock(VirtualHost.class); + + public void setUp() throws Exception + { + super.setUp(); + + TaskExecutor taskExecutor = mock(TaskExecutor.class); + when(taskExecutor.isTaskExecutorThread()).thenReturn(true); + when(_virtualHost.getTaskExecutor()).thenReturn(taskExecutor); + + _storePath = TestFileUtils.createTestDirectory("bdb", true); + + setTestSystemProperty(ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, "100"); + } + + @Override + public void tearDown() throws Exception + { + try + { + for (EnvironmentFacade ef : _nodes.values()) + { + ef.close(); + } + } + finally + { + try + { + if (_storePath != null) + { + FileUtils.delete(_storePath, true); + } + } + finally + { + super.tearDown(); + } + } + } + public void testEnvironmentFacade() throws Exception + { + EnvironmentFacade ef = createMaster(); + assertNotNull("Environment should not be null", ef); + Environment e = ef.getEnvironment(); + assertTrue("Environment is not valid", e.isValid()); + } + + public void testClose() throws Exception + { + EnvironmentFacade ef = createMaster(); + ef.close(); + Environment e = ef.getEnvironment(); + + assertNull("Environment should be null after facade close", e); + } + + public void testOpenDatabases() throws Exception + { + EnvironmentFacade ef = createMaster(); + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + ef.openDatabases(dbConfig, "test1", "test2"); + Database test1 = ef.getOpenDatabase("test1"); + Database test2 = ef.getOpenDatabase("test2"); + + assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); + assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName()); + } + + public void testGetOpenDatabaseForNonExistingDatabase() throws Exception + { + EnvironmentFacade ef = createMaster(); + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + ef.openDatabases(dbConfig, "test1"); + Database test1 = ef.getOpenDatabase("test1"); + assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); + try + { + ef.getOpenDatabase("test2"); + fail("An exception should be thrown for the non existing database"); + } + catch(IllegalArgumentException e) + { + assertEquals("Unexpected exception message", "Database with name 'test2' has never been requested to be opened", e.getMessage()); + } + } + + public void testGetGroupName() throws Exception + { + assertEquals("Unexpected group name", TEST_GROUP_NAME, createMaster().getGroupName()); + } + + public void testGetNodeName() throws Exception + { + assertEquals("Unexpected group name", TEST_NODE_NAME, createMaster().getNodeName()); + } + + public void testLastKnownReplicationTransactionId() throws Exception + { + ReplicatedEnvironmentFacade master = createMaster(); + long lastKnownReplicationTransactionId = master.getLastKnownReplicationTransactionId(); + assertTrue("Unexpected LastKnownReplicationTransactionId " + lastKnownReplicationTransactionId, lastKnownReplicationTransactionId > 0); + } + + public void testGetNodeHostPort() throws Exception + { + assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, createMaster().getHostPort()); + } + + public void testGetHelperHostPort() throws Exception + { + assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, createMaster().getHelperHostPort()); + } + + public void testGetDurability() throws Exception + { + assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability()); + } + + public void testIsCoalescingSync() throws Exception + { + assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, createMaster().isCoalescingSync()); + } + + public void testGetNodeState() throws Exception + { + assertEquals("Unexpected state", State.MASTER.name(), createMaster().getNodeState()); + } + + + public void testPriority() throws Exception + { + ReplicatedEnvironmentFacade facade = createMaster(); + assertEquals("Unexpected priority", TEST_PRIORITY, facade.getPriority()); + Future<Void> future = facade.setPriority(TEST_PRIORITY + 1); + future.get(5, TimeUnit.SECONDS); + assertEquals("Unexpected priority after change", TEST_PRIORITY + 1, facade.getPriority()); + } + + public void testDesignatedPrimary() throws Exception + { + ReplicatedEnvironmentFacade master = createMaster(); + assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary()); + Future<Void> future = master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY); + future.get(5, TimeUnit.SECONDS); + assertEquals("Unexpected designated primary after change", !TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary()); + } + + public void testElectableGroupSizeOverride() throws Exception + { + ReplicatedEnvironmentFacade facade = createMaster(); + assertEquals("Unexpected Electable Group Size Override", TEST_ELECTABLE_GROUP_OVERRIDE, facade.getElectableGroupSizeOverride()); + Future<Void> future = facade.setElectableGroupSizeOverride(TEST_ELECTABLE_GROUP_OVERRIDE + 1); + future.get(5, TimeUnit.SECONDS); + assertEquals("Unexpected Electable Group Size Override after change", TEST_ELECTABLE_GROUP_OVERRIDE + 1, facade.getElectableGroupSizeOverride()); + } + + public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception + { + final CountDownLatch masterLatch = new CountDownLatch(1); + final AtomicInteger masterStateChangeCount = new AtomicInteger(); + final CountDownLatch unknownLatch = new CountDownLatch(1); + final AtomicInteger unknownStateChangeCount = new AtomicInteger(); + StateChangeListener stateChangeListener = new StateChangeListener() + { + @Override + public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException + { + if (stateChangeEvent.getState() == State.MASTER) + { + masterStateChangeCount.incrementAndGet(); + masterLatch.countDown(); + } + else if (stateChangeEvent.getState() == State.UNKNOWN) + { + unknownStateChangeCount.incrementAndGet(); + unknownLatch.countDown(); + } + } + }; + + addNode(State.MASTER, stateChangeListener); + assertTrue("Master was not started", masterLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); + String node1NodeHostPort = "localhost:" + replica1Port; + int replica2Port = getNextAvailable(replica1Port + 1); + String node2NodeHostPort = "localhost:" + replica2Port; + + ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort); + ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort); + + // close replicas + replica1.close(); + replica2.close(); + + assertTrue("Environment should be recreated and go into unknown state", + unknownLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS)); + + assertEquals("Node made master an unexpected number of times", 1, masterStateChangeCount.get()); + assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get()); + } + + public void testCloseStateTransitions() throws Exception + { + ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster(); + + assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState()); + replicatedEnvironmentFacade.close(); + assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState()); + } + + private ReplicatedEnvironmentFacade createMaster() throws Exception + { + TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); + ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener); + assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + return env; + } + + private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort) throws Exception + { + TestStateChangeListener testStateChangeListener = new TestStateChangeListener(State.REPLICA); + ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener); + boolean awaitForStateChange = testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS); + assertTrue("Replica " + nodeName + " did not go into desired state; current actual state is " + testStateChangeListener.getCurrentActualState(), awaitForStateChange); + return replicaEnvironmentFacade; + } + + private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary, + State desiredState, StateChangeListener stateChangeListener) + { + ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary); + ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config); + ref.setStateChangeListener(stateChangeListener); + _nodes.put(nodeName, ref); + return ref; + } + + private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener) + { + return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener); + } + + private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary) + { + ReplicatedEnvironmentConfiguration node = mock(ReplicatedEnvironmentConfiguration.class); + when(node.getName()).thenReturn(nodeName); + when(node.getHostPort()).thenReturn(nodeHostPort); + when(node.isDesignatedPrimary()).thenReturn(designatedPrimary); + when(node.getQuorumOverride()).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE); + when(node.getPriority()).thenReturn(TEST_PRIORITY); + when(node.getGroupName()).thenReturn(TEST_GROUP_NAME); + when(node.getHelperHostPort()).thenReturn(TEST_NODE_HELPER_HOST_PORT); + when(node.getDurability()).thenReturn(TEST_DURABILITY); + when(node.isCoalescingSync()).thenReturn(TEST_COALESCING_SYNC); + + Map<String, String> repConfig = new HashMap<String, String>(); + repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s"); + repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s"); + when(node.getReplicationParameters()).thenReturn(repConfig); + when(node.getStorePath()).thenReturn(new File(_storePath, nodeName).getAbsolutePath()); + return node; + } +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java new file mode 100644 index 0000000000..0870191b35 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import com.sleepycat.je.rep.ReplicatedEnvironment.State; +import com.sleepycat.je.rep.StateChangeEvent; +import com.sleepycat.je.rep.StateChangeListener; + +class TestStateChangeListener implements StateChangeListener +{ + private final Set<State> _expectedStates; + private final CountDownLatch _latch; + private final AtomicReference<State> _currentActualState = new AtomicReference<State>(); + + public TestStateChangeListener(State expectedState) + { + this(Collections.singleton(expectedState)); + } + + public TestStateChangeListener(Set<State> expectedStates) + { + _expectedStates = new HashSet<State>(expectedStates); + _latch = new CountDownLatch(1); + } + + @Override + public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException + { + _currentActualState.set(stateChangeEvent.getState()); + if (_expectedStates.contains(stateChangeEvent.getState())) + { + _latch.countDown(); + } + } + + public boolean awaitForStateChange(long timeout, TimeUnit timeUnit) throws InterruptedException + { + return _latch.await(timeout, timeUnit); + } + + public State getCurrentActualState() + { + return _currentActualState.get(); + } +}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java index 400ac12792..810f4a1fca 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java @@ -26,7 +26,7 @@ import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.OperationStatus; -import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; import org.apache.qpid.server.util.ServerScopedRuntimeException; public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase @@ -94,7 +94,7 @@ public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase catch(ServerScopedRuntimeException ex) { assertEquals("Incorrect exception thrown", "Database version 999 is higher than the most recent known version: " - + AbstractBDBMessageStore.VERSION, ex.getMessage()); + + BDBMessageStore.VERSION, ex.getMessage()); } } |
