diff options
| author | Keith Wall <kwall@apache.org> | 2014-01-22 16:08:18 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-01-22 16:08:18 +0000 |
| commit | c85cc192b9582d103cec0381b7e91d459e1db00c (patch) | |
| tree | 6b8906292d46262ef191ae5274767d27220da94a /qpid/java | |
| parent | dc5b63d51c8025ee797ee916495d01bbdd6c692e (diff) | |
| download | qpid-python-c85cc192b9582d103cec0381b7e91d459e1db00c.tar.gz | |
QPID-5409: Refactring of bdb ha replication functionality and fixing tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1560400 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
27 files changed, 274 insertions, 221 deletions
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java index 8351089bde..757949cf61 100644 --- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java @@ -39,7 +39,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.jmx.AMQManagedObject; import org.apache.qpid.server.jmx.ManagedObject; -import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; /** * Management mbean for BDB HA. diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java index 62c11a307c..16199d30a3 100644 --- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java @@ -29,7 +29,7 @@ import org.apache.qpid.server.jmx.ManagedObject; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; -import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; /** * This provide will create a {@link BDBHAMessageStoreManagerMBean} if the child is a virtual diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java index f8c330450c..154c25e4dd 100644 --- a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java +++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java @@ -43,9 +43,9 @@ import org.apache.qpid.server.jmx.ManagedObjectRegistry; import org.apache.qpid.server.logging.SystemOutMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.TestLogActor; -import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.jmx.BDBHAMessageStoreManagerMBean; import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; public class BDBHAMessageStoreManagerMBeanTest extends TestCase { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java index 9b97fec479..5a498470fb 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java @@ -22,7 +22,6 @@ package org.apache.qpid.server.store.berkeleydb; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.util.DbBackup; import org.apache.log4j.Logger; @@ -336,17 +335,4 @@ public class BDBBackup return backedUpFileNames.toArray(new String[backedUpFileNames.size()]); } - /* - * Creates an environment for the bdb log files in the specified directory. This envrinonment can only be used - * to backup these files, if they are not locked by another database instance. - * - * @param fromdir The path to the directory to create the environment for. - * - * @throws DatabaseException Any underlying exceptions from BDB are allowed to fall through. - */ - private Environment createSourceDirEnvironment(String fromdir) throws DatabaseException - { - // Initialize the BDB backup utility on the source directory. - return new Environment(new File(fromdir), new EnvironmentConfig()); - } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index 8d29e89472..17e04707fc 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -20,16 +20,9 @@ package org.apache.qpid.server.store.berkeleydb; * */ -import java.util.concurrent.Callable; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; - import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; -import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.actors.AbstractActor; -import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.replication.ReplicationGroupListener; @@ -40,6 +33,8 @@ import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.OperationalLoggingListener; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; import org.apache.qpid.server.virtualhost.DefaultUpgraderProvider; import org.apache.qpid.server.virtualhost.State; @@ -71,7 +66,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost protected void initialiseStorage(VirtualHostConfiguration hostConfig, VirtualHost virtualHost) throws Exception { - _messageStore = new BDBMessageStore(ReplicatedEnvironmentFacade.TYPE, new ReplicatedEnvironmentFacadeFactory()); + _messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory()); final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName()); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java index 94a535ed0e..48d863530a 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java @@ -83,7 +83,6 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory { LinkedHashMap<String,Object> convertedMap = new LinkedHashMap<String, Object>(); Configuration storeConfiguration = configuration.subset("store"); - convertedMap.put(org.apache.qpid.server.model.VirtualHost.STORE_PATH, storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY)); convertedMap.put(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.OVERFULL_SIZE_PROPERTY)); convertedMap.put(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY)); return convertedMap; @@ -119,6 +118,8 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory attributes.put(ReplicationNode.GROUP_NAME, groupName); attributes.put(ReplicationNode.HOST_PORT, storeConfiguration.getString("highAvailability.nodeHostPort")); attributes.put(ReplicationNode.HELPER_HOST_PORT, storeConfiguration.getString("highAvailability.helperHostPort")); + attributes.put(org.apache.qpid.server.model.VirtualHost.STORE_PATH, + storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY)); String durability = storeConfiguration.getString("highAvailability.durability"); if (durability != null) diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 377ccd7a24..d69a1f9201 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -25,6 +25,7 @@ import com.sleepycat.bind.tuple.IntegerBinding; import com.sleepycat.bind.tuple.LongBinding; import com.sleepycat.je.*; import com.sleepycat.je.Transaction; + import java.io.File; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; @@ -37,6 +38,7 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.message.EnqueableMessage; @@ -71,7 +73,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class); public static final int VERSION = 7; - public static final String TYPE = "BDB"; + public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig"; private static final int LOCK_RETRY_ATTEMPTS = 5; private static String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS"; @@ -104,7 +106,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private long _persistentSizeHighThreshold; private final EventManager _eventManager = new EventManager(); - private String _storeLocation; private final String _type; private VirtualHost _virtualHost; @@ -114,12 +115,12 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore public BDBMessageStore() { - this(TYPE, new StandardEnvironmentFacadeFactory()); + this(new StandardEnvironmentFacadeFactory()); } - public BDBMessageStore(String type, EnvironmentFacadeFactory environmentFacadeFactory) + public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory) { - _type = type; + _type = environmentFacadeFactory.getType();; _environmentFacadeFactory = environmentFacadeFactory; _stateManager = new StateManager(_eventManager); } @@ -218,27 +219,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private void configure(VirtualHost virtualHost, boolean isMessageStore) throws AMQStoreException { - String name = virtualHost.getName(); - final String defaultPath = System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name; - - String storeLocation; - if(isMessageStore) - { - storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); - if(storeLocation == null) - { - storeLocation = defaultPath; - } - } - else // we are acting only as the durable config store - { - storeLocation = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH); - if(storeLocation == null) - { - storeLocation = defaultPath; - } - } - Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); @@ -253,29 +233,20 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore _persistentSizeLowThreshold = _persistentSizeHighThreshold; } - File environmentPath = new File(storeLocation); - if (!environmentPath.exists()) - { - if (!environmentPath.mkdirs()) - { - throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " - + "Ensure the path is correct and that the permissions are correct."); - } - } - - _storeLocation = storeLocation; - - LOGGER.info("Setting up environment"); - _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeLocation, virtualHost); + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHost, isMessageStore); - _committer = _environmentFacade.createCommitter(null); + _committer = _environmentFacade.createCommitter(virtualHost.getName()); _committer.start(); } @Override public String getStoreLocation() { - return _storeLocation; + if (_environmentFacade == null) + { + return null; + } + return _environmentFacade.getStoreLocation(); } public EnvironmentFacade getEnvironmentFacade() @@ -1695,19 +1666,21 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore @Override public void onDelete() { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Deleting store " + _storeLocation); - } + String storeLocation = getStoreLocation(); - if (_storeLocation != null) + if (storeLocation != null) { - File location = new File(_storeLocation); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Deleting store " + storeLocation); + } + + File location = new File(storeLocation); if (location.exists()) { if (!FileUtils.delete(location, true)) { - LOGGER.error("Cannot delete " + _storeLocation); + LOGGER.error("Cannot delete " + storeLocation); } } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java index d7c8b23d39..4abe81c56c 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; + import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory; @@ -37,7 +38,7 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi @Override public String getType() { - return BDBMessageStore.TYPE; + return StandardEnvironmentFacade.TYPE; } @Override @@ -71,7 +72,7 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi if(initialSize != 0) { - return Collections.singletonMap("bdbEnvironmentConfig", (Object)attributes); + return Collections.singletonMap(BDBMessageStore.ENVIRONMENT_CONFIGURATION, (Object)attributes); } else { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java index c2aef3d25d..60ff529203 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java @@ -56,4 +56,6 @@ public interface EnvironmentFacade void close(); + String getStoreLocation(); + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java index 8b4738b4d3..b784e436b9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java @@ -25,6 +25,8 @@ import org.apache.qpid.server.model.VirtualHost; public interface EnvironmentFacadeFactory { - EnvironmentFacade createEnvironmentFacade(String storeLocation, VirtualHost virtualHost); + EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore); + + String getType(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java new file mode 100644 index 0000000000..b13766a136 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.log4j.Logger; + +import com.sleepycat.je.ExceptionEvent; +import com.sleepycat.je.ExceptionListener; + +public class LoggingAsyncExceptionListener implements ExceptionListener +{ + private static final Logger LOGGER = Logger.getLogger(LoggingAsyncExceptionListener.class); + + @Override + public void exceptionThrown(ExceptionEvent event) + { + LOGGER.error("Asynchronous exception thrown by BDB thread '" + event.getThreadName() + "'", event.getException()); + } +}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java index 4bf228c478..3b6eef832b 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java @@ -32,21 +32,35 @@ import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.ExceptionEvent; -import com.sleepycat.je.ExceptionListener; public class StandardEnvironmentFacade implements EnvironmentFacade { private static final Logger LOGGER = Logger.getLogger(StandardEnvironmentFacade.class); public static final String TYPE = "BDB"; - private Environment _environment; + private final String _storePath; private final Map<String, Database> _databases = new HashMap<String, Database>(); + private Environment _environment; + public StandardEnvironmentFacade(String storePath, Map<String, String> attributes) { + _storePath = storePath; - LOGGER.info("BDB message store using environment path " + storePath); + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Creating environment at environment path " + _storePath); + } + + File environmentPath = new File(storePath); + if (!environmentPath.exists()) + { + if (!environmentPath.mkdirs()) + { + throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " + + "Ensure the path is correct and that the permissions are correct."); + } + } EnvironmentConfig envConfig = new EnvironmentConfig(); envConfig.setAllowCreate(true); @@ -60,26 +74,9 @@ public class StandardEnvironmentFacade implements EnvironmentFacade envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); - try - { - _environment = new Environment(new File(storePath), envConfig); - } - catch (DatabaseException de) - { - if (de.getMessage().contains("Environment.setAllowCreate is false")) - { - // Allow the creation this time - envConfig.setAllowCreate(true); - _environment = new Environment(new File(storePath), envConfig); - } - else - { - throw de; - } - } + _environment = new Environment(environmentPath, envConfig); } - @Override public void commit(com.sleepycat.je.Transaction tx) throws AMQStoreException { @@ -196,15 +193,6 @@ public class StandardEnvironmentFacade implements EnvironmentFacade return new AMQStoreException(contextMessage, e); } - private class LoggingAsyncExceptionListener implements ExceptionListener - { - @Override - public void exceptionThrown(ExceptionEvent event) - { - LOGGER.error("Asynchronous exception thrown by BDB thread '" + event.getThreadName() + "'", event.getException()); - } - } - @Override public void openDatabases(DatabaseConfig dbConfig, String... databaseNames) { @@ -232,4 +220,10 @@ public class StandardEnvironmentFacade implements EnvironmentFacade return new CoalescingCommiter(name, this); } + @Override + public String getStoreLocation() + { + return _storePath; + } + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java index 7ffee19d78..384ceba98a 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java @@ -20,9 +20,11 @@ */ package org.apache.qpid.server.store.berkeleydb; +import java.io.File; import java.util.HashMap; import java.util.Map; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.VirtualHost; public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory @@ -30,17 +32,45 @@ public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactor @SuppressWarnings("unchecked") @Override - public EnvironmentFacade createEnvironmentFacade(String storePath, VirtualHost virtualHost) + public EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore) { Map<String, String> envConfigMap = new HashMap<String, String>(); envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS); - Object bdbEnvConfigAttr = virtualHost.getAttribute("bdbEnvironmentConfig"); - if (bdbEnvConfigAttr instanceof Map) + Object environmentConfigurationAttributes = virtualHost.getAttribute(BDBMessageStore.ENVIRONMENT_CONFIGURATION); + if (environmentConfigurationAttributes instanceof Map) { - envConfigMap.putAll((Map<String, String>) bdbEnvConfigAttr); + envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes); } - return new StandardEnvironmentFacade(storePath, envConfigMap); + + String name = virtualHost.getName(); + final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + name; + + String storeLocation; + if(isMessageStore) + { + storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); + if(storeLocation == null) + { + storeLocation = defaultPath; + } + } + else // we are acting only as the durable config store + { + storeLocation = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH); + if(storeLocation == null) + { + storeLocation = defaultPath; + } + } + + return new StandardEnvironmentFacade(storeLocation, envConfigMap); + } + + @Override + public String getType() + { + return StandardEnvironmentFacade.TYPE; } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java index ca3e858f01..38fdf34196 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java @@ -25,7 +25,6 @@ import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import com.sleepycat.bind.tuple.IntegerBinding; import com.sleepycat.bind.tuple.LongBinding; import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Transaction; @@ -35,7 +34,6 @@ public class DatabasePinger public static final String PING_DATABASE_NAME = "PINGDB"; private static final int ID = 0; - public void pingDb(EnvironmentFacade facade) { try diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java index b4a83e416e..556bcc54c3 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java @@ -37,7 +37,6 @@ import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.adapter.AbstractAdapter; import org.apache.qpid.server.model.adapter.NoStatistics; -import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade; import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.util.ParameterizedTypeImpl; @@ -110,6 +109,11 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication { throw new IllegalConfigurationException("Helper host and port attribute is not specified"); } + Object storePath = attributes.get(STORE_PATH); + if (storePath == null || storePath.equals("")) + { + throw new IllegalConfigurationException("Store path is not specified for the replication node"); + } return attributes; } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 2c8d7acc06..6c4f33364a 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.server.store.berkeleydb; +package org.apache.qpid.server.store.berkeleydb.replication; import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC; import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY; @@ -28,6 +28,7 @@ import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT; import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT; import static org.apache.qpid.server.model.ReplicationNode.PARAMETERS; import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS; +import static org.apache.qpid.server.model.ReplicationNode.STORE_PATH; import java.io.File; import java.net.InetSocketAddress; @@ -53,9 +54,10 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.replication.ReplicationGroupListener; -import org.apache.qpid.server.store.berkeleydb.replication.DatabasePinger; -import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode; -import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory; +import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter; +import org.apache.qpid.server.store.berkeleydb.Committer; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener; import org.apache.qpid.server.util.DaemonThreadFactory; import com.sleepycat.je.Database; @@ -65,8 +67,6 @@ import com.sleepycat.je.Durability; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.EnvironmentFailureException; -import com.sleepycat.je.ExceptionEvent; -import com.sleepycat.je.ExceptionListener; import com.sleepycat.je.OperationFailureException; import com.sleepycat.je.Transaction; import com.sleepycat.je.rep.InsufficientLogException; @@ -81,6 +81,7 @@ import com.sleepycat.je.rep.ReplicationNode; import com.sleepycat.je.rep.StateChangeEvent; import com.sleepycat.je.rep.StateChangeListener; import com.sleepycat.je.rep.util.ReplicationGroupAdmin; +import com.sleepycat.je.utilint.PropUtil; public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener { @@ -141,12 +142,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private final Durability _durability; private final boolean _designatedPrimary; private final boolean _coalescingSync; - private final String _environmentPath; + private final File _environmentDirectory; private final Map<String, String> _environmentParameters; private final Map<String, String> _replicationEnvironmentParameters; private final ExecutorService _restartEnvironmentExecutor; private final ScheduledExecutorService _groupChangeExecutor; - private final AtomicReference<State> _state = new AtomicReference<State>(State.INITIAL); + private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING); private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>(); private final ConcurrentMap<String, RemoteReplicationNode> _remoteReplicationNodes = new ConcurrentHashMap<String, RemoteReplicationNode>(); private final RemoteReplicationNodeFactory _remoteReplicationNodeFactory; @@ -158,10 +159,19 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private String _lastKnownReplicationTransactionId; @SuppressWarnings("unchecked") - public ReplicatedEnvironmentFacade(String environmentPath, org.apache.qpid.server.model.ReplicationNode replicationNode, + public ReplicatedEnvironmentFacade(org.apache.qpid.server.model.ReplicationNode replicationNode, RemoteReplicationNodeFactory remoteReplicationNodeFactory) { - _environmentPath = environmentPath; + _environmentDirectory = new File((String)replicationNode.getAttribute(STORE_PATH)); + if (!_environmentDirectory.exists()) + { + if (!_environmentDirectory.mkdirs()) + { + throw new IllegalArgumentException("Environment path " + _environmentDirectory + " could not be read or created. " + + "Ensure the path is correct and that the permissions are correct."); + } + } + _groupName = (String)replicationNode.getAttribute(GROUP_NAME); _nodeName = replicationNode.getName(); _nodeHostPort = (String)replicationNode.getAttribute(HOST_PORT);; @@ -177,43 +187,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName)); _remoteReplicationNodeFactory = remoteReplicationNodeFactory; - _state.set(State.OPENING); _groupChangeExecutor.scheduleWithFixedDelay(new GroupChangeLearner(), 0, GROUP_CHECK_INTERVAL, TimeUnit.MILLISECONDS); _groupChangeExecutor.schedule(new RemoteNodeStateLearner(), _remoteReplicationNodeFactory.getRemoteNodeMonitorInterval(), TimeUnit.MILLISECONDS); // create environment in a separate thread to avoid renaming of the current thread by JE - Future<ReplicatedEnvironment> environmentFuture = _restartEnvironmentExecutor.submit(new Callable<ReplicatedEnvironment>(){ - @Override - public ReplicatedEnvironment call() throws Exception - { - String originalThreadName = Thread.currentThread().getName(); - try - { - return createEnvironment(); - } - finally - { - Thread.currentThread().setName(originalThreadName); - } - }}); - - // TODO: evaluate the future timeout from JE ENVIRONMENT_SETUP - try - { - _environment = environmentFuture.get(15 * 2, TimeUnit.MINUTES); - } - catch (InterruptedException e) - { - Thread.currentThread().interrupt(); - } - catch (ExecutionException e) - { - throw new RuntimeException("Unexpected exception on environment creation", e.getCause()); - } - catch (TimeoutException e) - { - throw new RuntimeException("JE environment has not been created in due time"); - } + _environment = createEnvironment(true); populateExistingRemoteReplicationNodes(); } @@ -235,12 +213,17 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan @Override public void close() { - if (_state.compareAndSet(State.INITIAL, State.CLOSING) || _state.compareAndSet(State.OPENING, State.CLOSING) || - _state.compareAndSet(State.OPEN, State.CLOSING) || _state.compareAndSet(State.RESTARTING, State.CLOSING) ) + if (_state.compareAndSet(State.OPENING, State.CLOSING) || + _state.compareAndSet(State.OPEN, State.CLOSING) || + _state.compareAndSet(State.RESTARTING, State.CLOSING) ) { try { - LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName); + } + _restartEnvironmentExecutor.shutdown(); _groupChangeExecutor.shutdown(); closeDatabases(); @@ -321,7 +304,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private void openDatabaseInternally(String databaseName, DatabaseHolder holder) { - LOGGER.debug("Opening database " + databaseName + " on " + _prettyGroupNodeName); Database database = _environment.openDatabase(null, databaseName, holder.getConfig()); holder.setDatabase(database); } @@ -352,6 +334,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } @Override + public String getStoreLocation() + { + return _environmentDirectory.getAbsolutePath(); + } + + @Override public void stateChange(final StateChangeEvent stateChangeEvent) { _groupChangeExecutor.submit(new Runnable() @@ -381,7 +369,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { reopenDatabases(); StateChangeListener listener = _stateChangeListener.get(); - LOGGER.debug("Application state change listener " + listener); + if (listener != null) { listener.stateChange(stateChangeEvent); @@ -652,7 +640,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan closeEnvironmentSafely(); - _environment = createEnvironment(); + _environment = createEnvironment(false); if (_stateChangeListener.get() != null) { @@ -692,7 +680,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private void closeDatabases() { RuntimeException firstThrownException = null; - LOGGER.debug("Closing databases " + _databases); for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet()) { DatabaseHolder databaseHolder = entry.getValue(); @@ -701,7 +688,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { try { - LOGGER.debug("Closing database " + entry.getKey() + " on " + _prettyGroupNodeName); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Closing database " + entry.getKey() + " on " + _prettyGroupNodeName); + } + database.close(); } catch(RuntimeException e) @@ -724,12 +715,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - private ReplicatedEnvironment createEnvironment() + private ReplicatedEnvironment createEnvironment(boolean createEnvironmentInSeparateThread) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Creating environment"); - LOGGER.info("Environment path " + _environmentPath); + LOGGER.info("Environment path " + _environmentDirectory.getAbsolutePath()); LOGGER.info("Group name " + _groupName); LOGGER.info("Node name " + _nodeName); LOGGER.info("Node host port " + _nodeHostPort); @@ -750,7 +741,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan environmentSettings.putAll(_environmentParameters); } - final ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort); + ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort); replicationConfig.setHelperHosts(_helperHostPort); replicationConfig.setDesignatedPrimary(_designatedPrimary); @@ -778,8 +769,58 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan envConfig.setConfigParam(configItem.getKey(), configItem.getValue()); } + if (createEnvironmentInSeparateThread) + { + return createEnvironmentInSeparateThread(_environmentDirectory, envConfig, replicationConfig); + } + else + { + return createEnvironment(_environmentDirectory, envConfig, replicationConfig); + } + } + + private ReplicatedEnvironment createEnvironmentInSeparateThread(final File environmentPathFile, final EnvironmentConfig envConfig, + final ReplicationConfig replicationConfig) + { + Future<ReplicatedEnvironment> environmentFuture = _restartEnvironmentExecutor.submit(new Callable<ReplicatedEnvironment>(){ + @Override + public ReplicatedEnvironment call() throws Exception + { + String originalThreadName = Thread.currentThread().getName(); + try + { + return createEnvironment(environmentPathFile, envConfig, replicationConfig); + } + finally + { + Thread.currentThread().setName(originalThreadName); + } + }}); + + long setUpTimeOutMillis = PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT)); + try + { + return environmentFuture.get(setUpTimeOutMillis, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new RuntimeException("Environment creation was interrupted", e); + } + catch (ExecutionException e) + { + throw new RuntimeException("Unexpected exception on environment creation", e.getCause()); + } + catch (TimeoutException e) + { + throw new RuntimeException("JE environment has not been created in due time"); + } + } + + private ReplicatedEnvironment createEnvironment(File environmentPathFile, EnvironmentConfig envConfig, + final ReplicationConfig replicationConfig) + { ReplicatedEnvironment environment = null; - File environmentPathFile = new File(_environmentPath); try { environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); @@ -875,7 +916,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - //TODO: move the class into external class private class RemoteNodeStateLearner implements Callable<Void> { private Map<String, String> _previousGroupState = Collections.emptyMap(); @@ -944,18 +984,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - private class LoggingAsyncExceptionListener implements ExceptionListener - { - @Override - public void exceptionThrown(ExceptionEvent event) - { - LOGGER.error("Asynchronous exception thrown by BDB thread '" + event.getThreadName() + "'", event.getException()); - } - } - public static enum State { - INITIAL, // TODO unused remove OPENING, OPEN, RESTARTING, @@ -995,4 +1025,5 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java index 324b7b647a..0ddd7134ac 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.server.store.berkeleydb; +package org.apache.qpid.server.store.berkeleydb.replication; import java.util.Collection; import java.util.HashMap; @@ -27,9 +27,8 @@ import java.util.Map; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.ReplicationNode; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.berkeleydb.replication.LocalReplicationNode; -import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode; -import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory; import com.sleepycat.je.Durability; import com.sleepycat.je.Durability.SyncPolicy; @@ -38,7 +37,7 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact { @Override - public EnvironmentFacade createEnvironmentFacade(String storeLocation, VirtualHost virtualHost) + public EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore) { Collection<ReplicationNode> replicationNodes = virtualHost.getChildren(ReplicationNode.class); if (replicationNodes == null || replicationNodes.size() != 1) @@ -59,7 +58,7 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact + "! Please set highAvailability.coalescingSync to false in store configuration."); } - ReplicatedEnvironmentFacade facade = new ReplicatedEnvironmentFacade(storeLocation, localNode, new RemoteReplicationNodeFactoryImpl(virtualHost)); + ReplicatedEnvironmentFacade facade = new ReplicatedEnvironmentFacade(localNode, new RemoteReplicationNodeFactoryImpl(virtualHost)); ((LocalReplicationNode)localNode).setReplicatedEnvironmentFacade(facade); return facade; } @@ -89,4 +88,10 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact return (Long)_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL); } } + + @Override + public String getType() + { + return ReplicatedEnvironmentFacade.TYPE; + } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java index 4684358190..7a645a6932 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java @@ -65,7 +65,7 @@ public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestB _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); Map<String,String> envMap = Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE); - when(virtualHost.getAttribute(eq("bdbEnvironmentConfig"))).thenReturn(envMap); + when(virtualHost.getAttribute(eq(BDBMessageStore.ENVIRONMENT_CONFIGURATION))).thenReturn(envMap); when(virtualHost.getAttribute(eq(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE))).thenReturn(OVERFULL_SIZE); when(virtualHost.getAttribute(eq(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE))).thenReturn(UNDERFULL_SIZE); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java index 730001d849..48fb180984 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java @@ -22,20 +22,15 @@ package org.apache.qpid.server.store.berkeleydb; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreCreator; -import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; import org.apache.qpid.test.utils.QpidTestCase; public class MessageStoreCreatorTest extends QpidTestCase { - private static final String[] STORE_TYPES = {BDBMessageStore.TYPE}; - public void testMessageStoreCreator() { MessageStoreCreator messageStoreCreator = new MessageStoreCreator(); - for (String type : STORE_TYPES) - { - MessageStore store = messageStoreCreator.createMessageStore(type); - assertNotNull("Store of type " + type + " is not created", store); - } + String type = new BDBMessageStoreFactory().getType(); + MessageStore store = messageStoreCreator.createMessageStore(type); + assertNotNull("Store of type " + type + " is not created", store); } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java index 5bad51fbc3..b19e18b204 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java @@ -24,7 +24,6 @@ import java.io.File; import java.util.Collections; import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.test.utils.TestFileUtils; import org.apache.qpid.util.FileUtils; import com.sleepycat.je.Database; @@ -39,7 +38,7 @@ public class StandardEnvironmentFacadeTest extends QpidTestCase protected void setUp() throws Exception { super.setUp(); - _storePath = TestFileUtils.createTestDirectory("bdb", true); + _storePath = new File(TMP_FOLDER + File.separator + "bdb" + File.separator + getTestName()); } protected void tearDown() throws Exception diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java index 97fd927f01..7269988042 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java @@ -46,6 +46,7 @@ import org.apache.qpid.server.model.ReplicationNode; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.test.utils.QpidTestCase; @@ -185,7 +186,7 @@ public class VirtualHostTest extends QpidTestCase _host.setDesiredState(State.INITIALISING, State.ACTIVE); assertEquals("Unexpected host name", hostName, _host.getName()); assertEquals("Unexpected host type", StandardVirtualHostFactory.TYPE, _host.getType()); - assertEquals("Unexpected store type", BDBMessageStore.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE)); + assertEquals("Unexpected store type", new BDBMessageStoreFactory().getType(), _host.getAttribute(VirtualHost.STORE_TYPE)); assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH)); BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore(); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java index 138b4bc398..330abce5cf 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java @@ -32,7 +32,6 @@ import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.ReplicationNode; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade; import org.apache.qpid.test.utils.QpidTestCase; public class LocalReplicationNodeTest extends QpidTestCase @@ -184,6 +183,7 @@ public class LocalReplicationNodeTest extends QpidTestCase attributes.put(ReplicationNode.GROUP_NAME, "testGroup"); attributes.put(ReplicationNode.HOST_PORT, "localhost:5000"); attributes.put(ReplicationNode.HELPER_HOST_PORT, "localhost:5001"); + attributes.put(ReplicationNode.STORE_PATH, TMP_FOLDER + File.separator + getTestName()); return attributes; } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopReplicationGroupListener.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/NoopReplicationGroupListener.java index f4bb79596a..21c902ae8f 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopReplicationGroupListener.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/NoopReplicationGroupListener.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.server.store.berkeleydb; +package org.apache.qpid.server.store.berkeleydb.replication; import org.apache.qpid.server.model.ReplicationNode; import org.apache.qpid.server.replication.ReplicationGroupListener; diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index 806c108acd..cea7d52d43 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.server.store.berkeleydb; +package org.apache.qpid.server.store.berkeleydb.replication; import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC; import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY; @@ -28,6 +28,7 @@ import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT; import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT; import static org.apache.qpid.server.model.ReplicationNode.NAME; import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS; +import static org.apache.qpid.server.model.ReplicationNode.STORE_PATH; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -47,8 +48,11 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.model.ReplicationNode; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.replication.ReplicationGroupListener; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode; import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.test.utils.TestFileUtils; import org.apache.qpid.util.FileUtils; @@ -65,7 +69,6 @@ import com.sleepycat.je.rep.StateChangeListener; public class ReplicatedEnvironmentFacadeTest extends QpidTestCase { - protected File _storePath; private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort(); private static final int LISTENER_TIMEOUT = 5; private static final int WAIT_STATE_CHANGE_TIMEOUT = 30; @@ -76,6 +79,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString(); private static final boolean TEST_DESIGNATED_PRIMARY = false; private static final boolean TEST_COALESCING_SYNC = true; + + private File _storePath; private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>(); private VirtualHost _virtualHost = mock(VirtualHost.class); private RemoteReplicationNodeFactory _remoteReplicationNodeFactory = new ReplicatedEnvironmentFacadeFactory.RemoteReplicationNodeFactoryImpl(_virtualHost); @@ -540,24 +545,12 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertTrue("Replica " + nodeName + " was not started", testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); return replicaEnvironmentFacade; } - - private String createNodeWorkingFolder(String nodeName) - { - File nodeLocation = new File(_storePath, nodeName); - if (!nodeLocation.exists()) - { - nodeLocation.mkdirs(); - } - final String nodePath = nodeLocation.getAbsolutePath(); - return nodePath; - } - private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary, State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener) { - final String nodePath = createNodeWorkingFolder(nodeName); + ReplicationNode node = createReplicationNodeMock(nodeName, nodeHostPort, designatedPrimary); - ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(nodePath, node, _remoteReplicationNodeFactory); + ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(node, _remoteReplicationNodeFactory); ref.setReplicationGroupListener(replicationGroupListener); ref.setStateChangeListener(stateChangeListener); _nodes.put(nodeName, ref); @@ -594,6 +587,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s"); repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s"); when(node.getAttribute(REPLICATION_PARAMETERS)).thenReturn(repConfig); + + when(node.getAttribute(STORE_PATH)).thenReturn(new File(_storePath, nodeName).getAbsolutePath()); return node; } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/TestStateChangeListener.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java index 5b5a507c1f..1e244e1f89 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/TestStateChangeListener.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.server.store.berkeleydb; +package org.apache.qpid.server.store.berkeleydb.replication; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java index c190498a36..6bddf5876f 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java @@ -38,6 +38,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.management.common.mbeans.ManagedBroker; import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java index f8b0fb362c..d7b27baa4e 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java @@ -19,6 +19,7 @@ */ package org.apache.qpid.server.store.berkeleydb; +import java.io.File; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; @@ -41,6 +42,7 @@ import javax.jms.Connection; import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.ReplicationNode; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -69,7 +71,7 @@ public class HATestClusterCreator private final String _virtualHostName; private final String _ipAddressOfBroker; - private final String _groupName ; + private final String _groupName; private final int _numberOfNodes; private int _bdbHelperPort; private int _primaryBrokerPort; @@ -350,6 +352,7 @@ public class HATestClusterCreator { replicationNodeAttributes.put(ReplicationNode.REPLICATION_PARAMETERS, replicationParameters); } + replicationNodeAttributes.put(ReplicationNode.STORE_PATH, System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + nodeName); // ha virtual host Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(); |
