summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/bdbstore/src/test')
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java170
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java45
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java14
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java128
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java208
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java336
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java70
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java4
8 files changed, 748 insertions, 227 deletions
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
deleted file mode 100644
index c2b3aeab3e..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import java.io.File;
-import java.net.InetAddress;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.util.FileUtils;
-
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.rep.ReplicatedEnvironment;
-import com.sleepycat.je.rep.ReplicationConfig;
-
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class BDBHAMessageStoreTest extends QpidTestCase
-{
- private static final String TEST_LOG_FILE_MAX = "1000000";
- private static final String TEST_ELECTION_RETRIES = "1000";
- private static final String TEST_NUMBER_OF_THREADS = "10";
- private static final String TEST_ENV_CONSISTENCY_TIMEOUT = "9999999";
- private String _groupName;
- private String _workDir;
- private int _masterPort;
- private String _host;
- private XMLConfiguration _configXml;
- private VirtualHost _virtualHost;
- private org.apache.qpid.server.model.VirtualHost _modelVhost;
-
- public void setUp() throws Exception
- {
- super.setUp();
-
- _workDir = TMP_FOLDER + File.separator + getName();
- _host = InetAddress.getByName("localhost").getHostAddress();
- _groupName = "group" + getName();
- _masterPort = -1;
-
- FileUtils.delete(new File(_workDir), true);
- _configXml = new XMLConfiguration();
- _modelVhost = mock(org.apache.qpid.server.model.VirtualHost.class);
-
-
- BrokerTestHelper.setUp();
- }
-
- public void tearDown() throws Exception
- {
- try
- {
- if (_virtualHost != null)
- {
- _virtualHost.close();
- }
- FileUtils.delete(new File(_workDir), true);
- }
- finally
- {
- BrokerTestHelper.tearDown();
- super.tearDown();
- }
- }
-
- public void testSetSystemConfiguration() throws Exception
- {
- // create virtual host configuration, registry and host instance
- addVirtualHostConfiguration();
- String vhostName = "test" + _masterPort;
- VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, _configXml.subset("virtualhosts.virtualhost." + vhostName), BrokerTestHelper.createBrokerMock());
-
- _virtualHost = BrokerTestHelper.createVirtualHost(configuration,new VirtualHostRegistry(new EventLogger()),_modelVhost);
- BDBHAMessageStore store = (BDBHAMessageStore) _virtualHost.getMessageStore();
-
- // test whether JVM system settings were applied
- Environment env = store.getEnvironment();
- assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS));
- assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
-
- ReplicatedEnvironment repEnv = store.getReplicatedEnvironment();
- assertEquals("Unexpected number of elections primary retries", TEST_ELECTION_RETRIES,
- repEnv.getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES));
- assertEquals("Unexpected number of elections primary retries", TEST_ENV_CONSISTENCY_TIMEOUT,
- repEnv.getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT));
- }
-
- private void addVirtualHostConfiguration() throws Exception
- {
- int port = findFreePort();
- if (_masterPort == -1)
- {
- _masterPort = port;
- }
- String nodeName = getNodeNameForNodeAt(port);
-
- String vhostName = "test" + port;
- String vhostPrefix = "virtualhosts.virtualhost." + vhostName;
-
- _configXml.addProperty("virtualhosts.virtualhost.name", vhostName);
- _configXml.addProperty(vhostPrefix + ".type", BDBHAVirtualHostFactory.TYPE);
-
- when(_modelVhost.getAttribute(eq(_modelVhost.STORE_PATH))).thenReturn(_workDir + File.separator
- + port);
- when(_modelVhost.getAttribute(eq("haGroupName"))).thenReturn(_groupName);
- when(_modelVhost.getAttribute(eq("haNodeName"))).thenReturn(nodeName);
- when(_modelVhost.getAttribute(eq("haNodeAddress"))).thenReturn(getNodeHostPortForNodeAt(port));
- when(_modelVhost.getAttribute(eq("haHelperAddress"))).thenReturn(getHelperHostPort());
-
- Map<String,String> bdbEnvConfig = new HashMap<String,String>();
- bdbEnvConfig.put(EnvironmentConfig.CLEANER_THREADS, TEST_NUMBER_OF_THREADS);
- bdbEnvConfig.put(EnvironmentConfig.LOG_FILE_MAX, TEST_LOG_FILE_MAX);
-
- when(_modelVhost.getAttribute(eq("bdbEnvironmentConfig"))).thenReturn(bdbEnvConfig);
-
- Map<String,String> repConfig = new HashMap<String,String>();
- repConfig.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, TEST_ELECTION_RETRIES);
- repConfig.put(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT, TEST_ENV_CONSISTENCY_TIMEOUT);
- when(_modelVhost.getAttribute(eq("haReplicationConfig"))).thenReturn(repConfig);
-
- }
-
- private String getNodeNameForNodeAt(final int bdbPort)
- {
- return "node" + getName() + bdbPort;
- }
-
- private String getNodeHostPortForNodeAt(final int bdbPort)
- {
- return _host + ":" + bdbPort;
- }
-
- private String getHelperHostPort()
- {
- if (_masterPort == -1)
- {
- throw new IllegalStateException("Helper port not yet assigned.");
- }
- return _host + ":" + _masterPort;
- }
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
deleted file mode 100644
index 7f7b65f315..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-import static org.mockito.Mockito.mock;
-
-public class HAMessageStoreSmokeTest extends QpidTestCase
-{
- private final BDBHAMessageStore _store = new BDBHAMessageStore();
-
- public void testMissingHAConfigThrowsException() throws Exception
- {
- try
- {
- _store.configure(mock(VirtualHost.class));
- fail("Expected an exception to be thrown");
- }
- catch (ServerScopedRuntimeException ce)
- {
- assertTrue(ce.getMessage().contains("BDB HA configuration key not found"));
- }
- }
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java
index 730001d849..385681446a 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java
@@ -22,20 +22,14 @@ package org.apache.qpid.server.store.berkeleydb;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreCreator;
-import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
import org.apache.qpid.test.utils.QpidTestCase;
public class MessageStoreCreatorTest extends QpidTestCase
{
- private static final String[] STORE_TYPES = {BDBMessageStore.TYPE};
-
public void testMessageStoreCreator()
{
MessageStoreCreator messageStoreCreator = new MessageStoreCreator();
- for (String type : STORE_TYPES)
- {
- MessageStore store = messageStoreCreator.createMessageStore(type);
- assertNotNull("Store of type " + type + " is not created", store);
- }
- }
-}
+ String type = new BDBMessageStoreFactory().getType();
+ MessageStore store = messageStoreCreator.createMessageStore(type);
+ assertNotNull("Store of type " + type + " is not created", store);
+ }}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
new file mode 100644
index 0000000000..b19e18b204
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.Collections;
+
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.Environment;
+
+public class StandardEnvironmentFacadeTest extends QpidTestCase
+{
+ protected File _storePath;
+ protected EnvironmentFacade _environmentFacade;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _storePath = new File(TMP_FOLDER + File.separator + "bdb" + File.separator + getTestName());
+ }
+
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ if (_environmentFacade != null)
+ {
+ _environmentFacade.close();
+ }
+ }
+ finally
+ {
+ if (_storePath != null)
+ {
+ FileUtils.delete(_storePath, true);
+ }
+ }
+ }
+
+ public void testEnvironmentFacade() throws Exception
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ assertNotNull("Environment should not be null", ef);
+ Environment e = ef.getEnvironment();
+ assertTrue("Environment is not valid", e.isValid());
+ }
+
+ public void testClose() throws Exception
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ ef.close();
+ Environment e = ef.getEnvironment();
+
+ assertNull("Environment should be null after facade close", e);
+ }
+
+ public void testOpenDatabases() throws Exception
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ ef.openDatabases(dbConfig, "test1", "test2");
+ Database test1 = ef.getOpenDatabase("test1");
+ Database test2 = ef.getOpenDatabase("test2");
+
+ assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+ assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName());
+ }
+
+ public void testGetOpenDatabaseForNonExistingDatabase() throws Exception
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ ef.openDatabases(dbConfig, "test1");
+ Database test1 = ef.getOpenDatabase("test1");
+ assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+ try
+ {
+ ef.getOpenDatabase("test2");
+ fail("An exception should be thrown for the non existing database");
+ }
+ catch(IllegalArgumentException e)
+ {
+ assertEquals("Unexpected exception message", "Database with name 'test2' has not been opened", e.getMessage());
+ }
+ }
+
+ EnvironmentFacade getEnvironmentFacade() throws Exception
+ {
+ if (_environmentFacade == null)
+ {
+ _environmentFacade = createEnvironmentFacade();
+ }
+ return _environmentFacade;
+ }
+
+ EnvironmentFacade createEnvironmentFacade()
+ {
+ return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap());
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
new file mode 100644
index 0000000000..a05a30b459
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
@@ -0,0 +1,208 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.ConfigurationEntry;
+import org.apache.qpid.server.configuration.ConfigurationEntryStore;
+import org.apache.qpid.server.configuration.RecovererProvider;
+import org.apache.qpid.server.configuration.startup.VirtualHostRecoverer;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.TestFileUtils;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+
+public class VirtualHostTest extends QpidTestCase
+{
+
+ private Broker _broker;
+ private StatisticsGatherer _statisticsGatherer;
+ private RecovererProvider _recovererProvider;
+ private File _configFile;
+ private File _bdbStorePath;
+ private VirtualHost _host;
+ private ConfigurationEntryStore _store;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _store = mock(ConfigurationEntryStore.class);
+ _broker = BrokerTestHelper.createBrokerMock();
+ TaskExecutor taslExecutor = mock(TaskExecutor.class);
+ when(taslExecutor.isTaskExecutorThread()).thenReturn(true);
+ when(_broker.getTaskExecutor()).thenReturn(taslExecutor);
+
+
+ _statisticsGatherer = mock(StatisticsGatherer.class);
+
+ _bdbStorePath = new File(TMP_FOLDER, getTestName() + "." + System.currentTimeMillis());
+ _bdbStorePath.deleteOnExit();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ if (_host != null)
+ {
+ _host.setDesiredState(_host.getState(), State.STOPPED);
+ }
+ }
+ finally
+ {
+ if (_configFile != null)
+ {
+ _configFile.delete();
+ }
+ if (_bdbStorePath != null)
+ {
+ FileUtils.delete(_bdbStorePath, true);
+ }
+ super.tearDown();
+ }
+ }
+
+
+ public void testCreateBdbVirtualHostFromConfigurationFile()
+ {
+ String hostName = getName();
+ long logFileMax = 2000000;
+ _host = createHostFromConfiguration(hostName, logFileMax);
+ _host.setDesiredState(State.INITIALISING, State.ACTIVE);
+ assertEquals("Unexpected host name", hostName, _host.getName());
+ assertEquals("Unexpected host type", StandardVirtualHostFactory.TYPE, _host.getType());
+ assertEquals("Unexpected store type", new BDBMessageStoreFactory().getType(), _host.getAttribute(VirtualHost.STORE_TYPE));
+ assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH));
+
+ BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore();
+ EnvironmentConfig envConfig = messageStore.getEnvironmentFacade().getEnvironment().getConfig();
+ assertEquals("Unexpected JE log file max", String.valueOf(logFileMax), envConfig.getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
+
+ }
+
+ public void testCreateBdbHaVirtualHostFromConfigurationFile()
+ {
+ String hostName = getName();
+
+ String repStreamTimeout = "2 h";
+ String nodeName = "node";
+ String groupName = "group";
+ String nodeHostPort = "localhost:" + findFreePort();
+ String helperHostPort = nodeHostPort;
+ String durability = "NO_SYNC,SYNC,NONE";
+ _host = createHaHostFromConfiguration(hostName, groupName, nodeName, nodeHostPort, helperHostPort, durability, repStreamTimeout);
+ _host.setDesiredState(State.INITIALISING, State.ACTIVE);
+ assertEquals("Unexpected host name", hostName, _host.getName());
+ assertEquals("Unexpected host type", BDBHAVirtualHostFactory.TYPE, _host.getType());
+ assertEquals("Unexpected store type", ReplicatedEnvironmentFacade.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE));
+ assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH));
+
+ BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore();
+ ReplicatedEnvironment environment = (ReplicatedEnvironment) messageStore.getEnvironmentFacade().getEnvironment();
+ ReplicationConfig repConfig = environment.getRepConfig();
+ assertEquals("Unexpected JE replication groupName", groupName, repConfig.getConfigParam(ReplicationConfig.GROUP_NAME));
+ assertEquals("Unexpected JE replication nodeName", nodeName, repConfig.getConfigParam(ReplicationConfig.NODE_NAME));
+ assertEquals("Unexpected JE replication nodeHostPort", nodeHostPort, repConfig.getConfigParam(ReplicationConfig.NODE_HOST_PORT));
+ assertEquals("Unexpected JE replication nodeHostPort", helperHostPort, repConfig.getConfigParam(ReplicationConfig.HELPER_HOSTS));
+ assertEquals("Unexpected JE replication nodeHostPort", "false", repConfig.getConfigParam(ReplicationConfig.DESIGNATED_PRIMARY));
+ assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, repConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
+ }
+
+ private VirtualHost createHost(Map<String, Object> attributes, Set<UUID> children)
+ {
+ ConfigurationEntry entry = new ConfigurationEntry(UUID.randomUUID(), VirtualHost.class.getSimpleName(), attributes,
+ children, _store);
+
+ return new VirtualHostRecoverer(_statisticsGatherer).create(_recovererProvider, entry, _broker);
+ }
+
+ private VirtualHost createHost(Map<String, Object> attributes)
+ {
+ return createHost(attributes, Collections.<UUID> emptySet());
+ }
+
+ private VirtualHost createHostFromConfiguration(String hostName, long logFileMax)
+ {
+ String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">"
+ + "<store><class>" + BDBMessageStore.class.getName() + "</class>"
+ + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>"
+ + "<envConfig><name>" + EnvironmentConfig.LOG_FILE_MAX + "</name><value>" + logFileMax + "</value></envConfig>"
+ + "</store>"
+ + "</" + hostName + "></virtualhost></virtualhosts>";
+ Map<String, Object> attributes = writeConfigAndGenerateAttributes(content);
+ return createHost(attributes);
+ }
+
+
+ private VirtualHost createHaHostFromConfiguration(String hostName, String groupName, String nodeName, String nodeHostPort, String helperHostPort, String durability, String repStreamTimeout)
+ {
+ String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">"
+ + "<type>" + BDBHAVirtualHostFactory.TYPE + "</type>"
+ + "<store><class>" + BDBMessageStore.class.getName() + "</class>"
+ + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>"
+ + "<highAvailability>"
+ + "<groupName>" + groupName + "</groupName>"
+ + "<nodeName>" + nodeName + "</nodeName>"
+ + "<nodeHostPort>" + nodeHostPort + "</nodeHostPort>"
+ + "<helperHostPort>" + helperHostPort + "</helperHostPort>"
+ + "<durability>" + durability.replaceAll(",", "\\\\,") + "</durability>"
+ + "</highAvailability>"
+ + "<repConfig><name>" + ReplicationConfig.REP_STREAM_TIMEOUT + "</name><value>" + repStreamTimeout + "</value></repConfig>"
+ + "</store>"
+ + "</" + hostName + "></virtualhost></virtualhosts>";
+ Map<String, Object> attributes = writeConfigAndGenerateAttributes(content);
+ return createHost(attributes);
+ }
+
+ private Map<String, Object> writeConfigAndGenerateAttributes(String content)
+ {
+ _configFile = TestFileUtils.createTempFile(this, ".virtualhost.xml", content);
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put(VirtualHost.NAME, getName());
+ attributes.put(VirtualHost.CONFIG_PATH, _configFile.getAbsolutePath());
+ return attributes;
+ }
+}
+
+ \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
new file mode 100644
index 0000000000..cd7dd69c46
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -0,0 +1,336 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.TestFileUtils;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.rep.ReplicatedEnvironment.State;
+import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+
+public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
+{
+
+ private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort();
+ private static final int LISTENER_TIMEOUT = 5;
+ private static final int WAIT_STATE_CHANGE_TIMEOUT = 30;
+ private static final String TEST_GROUP_NAME = "testGroupName";
+ private static final String TEST_NODE_NAME = "testNodeName";
+ private static final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT;
+ private static final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT;
+ private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString();
+ private static final boolean TEST_DESIGNATED_PRIMARY = false;
+ private static final boolean TEST_COALESCING_SYNC = true;
+ private static final int TEST_PRIORITY = 1;
+ private static final int TEST_ELECTABLE_GROUP_OVERRIDE = 0;
+
+ private File _storePath;
+ private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
+ private VirtualHost _virtualHost = mock(VirtualHost.class);
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ TaskExecutor taskExecutor = mock(TaskExecutor.class);
+ when(taskExecutor.isTaskExecutorThread()).thenReturn(true);
+ when(_virtualHost.getTaskExecutor()).thenReturn(taskExecutor);
+
+ _storePath = TestFileUtils.createTestDirectory("bdb", true);
+
+ setTestSystemProperty(ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, "100");
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ for (EnvironmentFacade ef : _nodes.values())
+ {
+ ef.close();
+ }
+ }
+ finally
+ {
+ try
+ {
+ if (_storePath != null)
+ {
+ FileUtils.delete(_storePath, true);
+ }
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+ }
+ public void testEnvironmentFacade() throws Exception
+ {
+ EnvironmentFacade ef = createMaster();
+ assertNotNull("Environment should not be null", ef);
+ Environment e = ef.getEnvironment();
+ assertTrue("Environment is not valid", e.isValid());
+ }
+
+ public void testClose() throws Exception
+ {
+ EnvironmentFacade ef = createMaster();
+ ef.close();
+ Environment e = ef.getEnvironment();
+
+ assertNull("Environment should be null after facade close", e);
+ }
+
+ public void testOpenDatabases() throws Exception
+ {
+ EnvironmentFacade ef = createMaster();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ ef.openDatabases(dbConfig, "test1", "test2");
+ Database test1 = ef.getOpenDatabase("test1");
+ Database test2 = ef.getOpenDatabase("test2");
+
+ assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+ assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName());
+ }
+
+ public void testGetOpenDatabaseForNonExistingDatabase() throws Exception
+ {
+ EnvironmentFacade ef = createMaster();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ ef.openDatabases(dbConfig, "test1");
+ Database test1 = ef.getOpenDatabase("test1");
+ assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+ try
+ {
+ ef.getOpenDatabase("test2");
+ fail("An exception should be thrown for the non existing database");
+ }
+ catch(IllegalArgumentException e)
+ {
+ assertEquals("Unexpected exception message", "Database with name 'test2' has never been requested to be opened", e.getMessage());
+ }
+ }
+
+ public void testGetGroupName() throws Exception
+ {
+ assertEquals("Unexpected group name", TEST_GROUP_NAME, createMaster().getGroupName());
+ }
+
+ public void testGetNodeName() throws Exception
+ {
+ assertEquals("Unexpected group name", TEST_NODE_NAME, createMaster().getNodeName());
+ }
+
+ public void testLastKnownReplicationTransactionId() throws Exception
+ {
+ ReplicatedEnvironmentFacade master = createMaster();
+ long lastKnownReplicationTransactionId = master.getLastKnownReplicationTransactionId();
+ assertTrue("Unexpected LastKnownReplicationTransactionId " + lastKnownReplicationTransactionId, lastKnownReplicationTransactionId > 0);
+ }
+
+ public void testGetNodeHostPort() throws Exception
+ {
+ assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, createMaster().getHostPort());
+ }
+
+ public void testGetHelperHostPort() throws Exception
+ {
+ assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, createMaster().getHelperHostPort());
+ }
+
+ public void testGetDurability() throws Exception
+ {
+ assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability());
+ }
+
+ public void testIsCoalescingSync() throws Exception
+ {
+ assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, createMaster().isCoalescingSync());
+ }
+
+ public void testGetNodeState() throws Exception
+ {
+ assertEquals("Unexpected state", State.MASTER.name(), createMaster().getNodeState());
+ }
+
+
+ public void testPriority() throws Exception
+ {
+ ReplicatedEnvironmentFacade facade = createMaster();
+ assertEquals("Unexpected priority", TEST_PRIORITY, facade.getPriority());
+ Future<Void> future = facade.setPriority(TEST_PRIORITY + 1);
+ future.get(5, TimeUnit.SECONDS);
+ assertEquals("Unexpected priority after change", TEST_PRIORITY + 1, facade.getPriority());
+ }
+
+ public void testDesignatedPrimary() throws Exception
+ {
+ ReplicatedEnvironmentFacade master = createMaster();
+ assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
+ Future<Void> future = master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY);
+ future.get(5, TimeUnit.SECONDS);
+ assertEquals("Unexpected designated primary after change", !TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
+ }
+
+ public void testElectableGroupSizeOverride() throws Exception
+ {
+ ReplicatedEnvironmentFacade facade = createMaster();
+ assertEquals("Unexpected Electable Group Size Override", TEST_ELECTABLE_GROUP_OVERRIDE, facade.getElectableGroupSizeOverride());
+ Future<Void> future = facade.setElectableGroupSizeOverride(TEST_ELECTABLE_GROUP_OVERRIDE + 1);
+ future.get(5, TimeUnit.SECONDS);
+ assertEquals("Unexpected Electable Group Size Override after change", TEST_ELECTABLE_GROUP_OVERRIDE + 1, facade.getElectableGroupSizeOverride());
+ }
+
+ public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception
+ {
+ final CountDownLatch masterLatch = new CountDownLatch(1);
+ final AtomicInteger masterStateChangeCount = new AtomicInteger();
+ final CountDownLatch unknownLatch = new CountDownLatch(1);
+ final AtomicInteger unknownStateChangeCount = new AtomicInteger();
+ StateChangeListener stateChangeListener = new StateChangeListener()
+ {
+ @Override
+ public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+ {
+ if (stateChangeEvent.getState() == State.MASTER)
+ {
+ masterStateChangeCount.incrementAndGet();
+ masterLatch.countDown();
+ }
+ else if (stateChangeEvent.getState() == State.UNKNOWN)
+ {
+ unknownStateChangeCount.incrementAndGet();
+ unknownLatch.countDown();
+ }
+ }
+ };
+
+ addNode(State.MASTER, stateChangeListener);
+ assertTrue("Master was not started", masterLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+ String node1NodeHostPort = "localhost:" + replica1Port;
+ int replica2Port = getNextAvailable(replica1Port + 1);
+ String node2NodeHostPort = "localhost:" + replica2Port;
+
+ ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
+ ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort);
+
+ // close replicas
+ replica1.close();
+ replica2.close();
+
+ assertTrue("Environment should be recreated and go into unknown state",
+ unknownLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
+
+ assertEquals("Node made master an unexpected number of times", 1, masterStateChangeCount.get());
+ assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get());
+ }
+
+ public void testCloseStateTransitions() throws Exception
+ {
+ ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster();
+
+ assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState());
+ replicatedEnvironmentFacade.close();
+ assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState());
+ }
+
+ private ReplicatedEnvironmentFacade createMaster() throws Exception
+ {
+ TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+ ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener);
+ assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+ return env;
+ }
+
+ private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort) throws Exception
+ {
+ TestStateChangeListener testStateChangeListener = new TestStateChangeListener(State.REPLICA);
+ ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener);
+ boolean awaitForStateChange = testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS);
+ assertTrue("Replica " + nodeName + " did not go into desired state; current actual state is " + testStateChangeListener.getCurrentActualState(), awaitForStateChange);
+ return replicaEnvironmentFacade;
+ }
+
+ private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary,
+ State desiredState, StateChangeListener stateChangeListener)
+ {
+ ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary);
+ ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config);
+ ref.setStateChangeListener(stateChangeListener);
+ _nodes.put(nodeName, ref);
+ return ref;
+ }
+
+ private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener)
+ {
+ return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener);
+ }
+
+ private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary)
+ {
+ ReplicatedEnvironmentConfiguration node = mock(ReplicatedEnvironmentConfiguration.class);
+ when(node.getName()).thenReturn(nodeName);
+ when(node.getHostPort()).thenReturn(nodeHostPort);
+ when(node.isDesignatedPrimary()).thenReturn(designatedPrimary);
+ when(node.getQuorumOverride()).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE);
+ when(node.getPriority()).thenReturn(TEST_PRIORITY);
+ when(node.getGroupName()).thenReturn(TEST_GROUP_NAME);
+ when(node.getHelperHostPort()).thenReturn(TEST_NODE_HELPER_HOST_PORT);
+ when(node.getDurability()).thenReturn(TEST_DURABILITY);
+ when(node.isCoalescingSync()).thenReturn(TEST_COALESCING_SYNC);
+
+ Map<String, String> repConfig = new HashMap<String, String>();
+ repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
+ repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
+ when(node.getReplicationParameters()).thenReturn(repConfig);
+ when(node.getStorePath()).thenReturn(new File(_storePath, nodeName).getAbsolutePath());
+ return node;
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java
new file mode 100644
index 0000000000..0870191b35
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.sleepycat.je.rep.ReplicatedEnvironment.State;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+
+class TestStateChangeListener implements StateChangeListener
+{
+ private final Set<State> _expectedStates;
+ private final CountDownLatch _latch;
+ private final AtomicReference<State> _currentActualState = new AtomicReference<State>();
+
+ public TestStateChangeListener(State expectedState)
+ {
+ this(Collections.singleton(expectedState));
+ }
+
+ public TestStateChangeListener(Set<State> expectedStates)
+ {
+ _expectedStates = new HashSet<State>(expectedStates);
+ _latch = new CountDownLatch(1);
+ }
+
+ @Override
+ public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+ {
+ _currentActualState.set(stateChangeEvent.getState());
+ if (_expectedStates.contains(stateChangeEvent.getState()))
+ {
+ _latch.countDown();
+ }
+ }
+
+ public boolean awaitForStateChange(long timeout, TimeUnit timeUnit) throws InterruptedException
+ {
+ return _latch.await(timeout, timeUnit);
+ }
+
+ public State getCurrentActualState()
+ {
+ return _currentActualState.get();
+ }
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
index 400ac12792..810f4a1fca 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
@@ -26,7 +26,7 @@ import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.OperationStatus;
-import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase
@@ -94,7 +94,7 @@ public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase
catch(ServerScopedRuntimeException ex)
{
assertEquals("Incorrect exception thrown", "Database version 999 is higher than the most recent known version: "
- + AbstractBDBMessageStore.VERSION, ex.getMessage());
+ + BDBMessageStore.VERSION, ex.getMessage());
}
}