summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-01-22 16:08:18 +0000
committerKeith Wall <kwall@apache.org>2014-01-22 16:08:18 +0000
commitc85cc192b9582d103cec0381b7e91d459e1db00c (patch)
tree6b8906292d46262ef191ae5274767d27220da94a /qpid/java
parentdc5b63d51c8025ee797ee916495d01bbdd6c692e (diff)
downloadqpid-python-c85cc192b9582d103cec0381b7e91d459e1db00c.tar.gz
QPID-5409: Refactring of bdb ha replication functionality and fixing tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1560400 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java2
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java2
-rw-r--r--qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java14
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java11
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java3
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java71
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java5
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java4
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java37
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java56
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java40
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java)163
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java)17
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java2
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java11
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java3
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java3
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java2
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/NoopReplicationGroupListener.java (renamed from qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopReplicationGroupListener.java)2
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java (renamed from qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java)27
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java (renamed from qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/TestStateChangeListener.java)2
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java1
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java5
27 files changed, 274 insertions, 221 deletions
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
index 8351089bde..757949cf61 100644
--- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
@@ -39,7 +39,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.jmx.AMQManagedObject;
import org.apache.qpid.server.jmx.ManagedObject;
-import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
/**
* Management mbean for BDB HA.
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
index 62c11a307c..16199d30a3 100644
--- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
@@ -29,7 +29,7 @@ import org.apache.qpid.server.jmx.ManagedObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
-import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
/**
* This provide will create a {@link BDBHAMessageStoreManagerMBean} if the child is a virtual
diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
index f8c330450c..154c25e4dd 100644
--- a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
+++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
@@ -43,9 +43,9 @@ import org.apache.qpid.server.jmx.ManagedObjectRegistry;
import org.apache.qpid.server.logging.SystemOutMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
-import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.jmx.BDBHAMessageStoreManagerMBean;
import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
public class BDBHAMessageStoreManagerMBeanTest extends TestCase
{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java
index 9b97fec479..5a498470fb 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.store.berkeleydb;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.util.DbBackup;
import org.apache.log4j.Logger;
@@ -336,17 +335,4 @@ public class BDBBackup
return backedUpFileNames.toArray(new String[backedUpFileNames.size()]);
}
- /*
- * Creates an environment for the bdb log files in the specified directory. This envrinonment can only be used
- * to backup these files, if they are not locked by another database instance.
- *
- * @param fromdir The path to the directory to create the environment for.
- *
- * @throws DatabaseException Any underlying exceptions from BDB are allowed to fall through.
- */
- private Environment createSourceDirEnvironment(String fromdir) throws DatabaseException
- {
- // Initialize the BDB backup utility on the source directory.
- return new Environment(new File(fromdir), new EnvironmentConfig());
- }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
index 8d29e89472..17e04707fc 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
@@ -20,16 +20,9 @@ package org.apache.qpid.server.store.berkeleydb;
*
*/
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.actors.AbstractActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.replication.ReplicationGroupListener;
@@ -40,6 +33,8 @@ import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.OperationalLoggingListener;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.server.virtualhost.DefaultUpgraderProvider;
import org.apache.qpid.server.virtualhost.State;
@@ -71,7 +66,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
protected void initialiseStorage(VirtualHostConfiguration hostConfig, VirtualHost virtualHost) throws Exception
{
- _messageStore = new BDBMessageStore(ReplicatedEnvironmentFacade.TYPE, new ReplicatedEnvironmentFacadeFactory());
+ _messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory());
final MessageStoreLogSubject storeLogSubject =
new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
index 94a535ed0e..48d863530a 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
@@ -83,7 +83,6 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory
{
LinkedHashMap<String,Object> convertedMap = new LinkedHashMap<String, Object>();
Configuration storeConfiguration = configuration.subset("store");
- convertedMap.put(org.apache.qpid.server.model.VirtualHost.STORE_PATH, storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY));
convertedMap.put(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.OVERFULL_SIZE_PROPERTY));
convertedMap.put(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY));
return convertedMap;
@@ -119,6 +118,8 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory
attributes.put(ReplicationNode.GROUP_NAME, groupName);
attributes.put(ReplicationNode.HOST_PORT, storeConfiguration.getString("highAvailability.nodeHostPort"));
attributes.put(ReplicationNode.HELPER_HOST_PORT, storeConfiguration.getString("highAvailability.helperHostPort"));
+ attributes.put(org.apache.qpid.server.model.VirtualHost.STORE_PATH,
+ storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY));
String durability = storeConfiguration.getString("highAvailability.durability");
if (durability != null)
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 377ccd7a24..d69a1f9201 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -25,6 +25,7 @@ import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.LongBinding;
import com.sleepycat.je.*;
import com.sleepycat.je.Transaction;
+
import java.io.File;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
@@ -37,6 +38,7 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.message.EnqueableMessage;
@@ -71,7 +73,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class);
public static final int VERSION = 7;
- public static final String TYPE = "BDB";
+ public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig";
private static final int LOCK_RETRY_ATTEMPTS = 5;
private static String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS";
@@ -104,7 +106,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private long _persistentSizeHighThreshold;
private final EventManager _eventManager = new EventManager();
- private String _storeLocation;
private final String _type;
private VirtualHost _virtualHost;
@@ -114,12 +115,12 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
public BDBMessageStore()
{
- this(TYPE, new StandardEnvironmentFacadeFactory());
+ this(new StandardEnvironmentFacadeFactory());
}
- public BDBMessageStore(String type, EnvironmentFacadeFactory environmentFacadeFactory)
+ public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory)
{
- _type = type;
+ _type = environmentFacadeFactory.getType();;
_environmentFacadeFactory = environmentFacadeFactory;
_stateManager = new StateManager(_eventManager);
}
@@ -218,27 +219,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private void configure(VirtualHost virtualHost, boolean isMessageStore) throws AMQStoreException
{
- String name = virtualHost.getName();
- final String defaultPath = System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name;
-
- String storeLocation;
- if(isMessageStore)
- {
- storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
- if(storeLocation == null)
- {
- storeLocation = defaultPath;
- }
- }
- else // we are acting only as the durable config store
- {
- storeLocation = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH);
- if(storeLocation == null)
- {
- storeLocation = defaultPath;
- }
- }
-
Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE);
Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE);
@@ -253,29 +233,20 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
_persistentSizeLowThreshold = _persistentSizeHighThreshold;
}
- File environmentPath = new File(storeLocation);
- if (!environmentPath.exists())
- {
- if (!environmentPath.mkdirs())
- {
- throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
- + "Ensure the path is correct and that the permissions are correct.");
- }
- }
-
- _storeLocation = storeLocation;
-
- LOGGER.info("Setting up environment");
- _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeLocation, virtualHost);
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHost, isMessageStore);
- _committer = _environmentFacade.createCommitter(null);
+ _committer = _environmentFacade.createCommitter(virtualHost.getName());
_committer.start();
}
@Override
public String getStoreLocation()
{
- return _storeLocation;
+ if (_environmentFacade == null)
+ {
+ return null;
+ }
+ return _environmentFacade.getStoreLocation();
}
public EnvironmentFacade getEnvironmentFacade()
@@ -1695,19 +1666,21 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
@Override
public void onDelete()
{
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Deleting store " + _storeLocation);
- }
+ String storeLocation = getStoreLocation();
- if (_storeLocation != null)
+ if (storeLocation != null)
{
- File location = new File(_storeLocation);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Deleting store " + storeLocation);
+ }
+
+ File location = new File(storeLocation);
if (location.exists())
{
if (!FileUtils.delete(location, true))
{
- LOGGER.error("Cannot delete " + _storeLocation);
+ LOGGER.error("Cannot delete " + storeLocation);
}
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
index d7c8b23d39..4abe81c56c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory;
@@ -37,7 +38,7 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi
@Override
public String getType()
{
- return BDBMessageStore.TYPE;
+ return StandardEnvironmentFacade.TYPE;
}
@Override
@@ -71,7 +72,7 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi
if(initialSize != 0)
{
- return Collections.singletonMap("bdbEnvironmentConfig", (Object)attributes);
+ return Collections.singletonMap(BDBMessageStore.ENVIRONMENT_CONFIGURATION, (Object)attributes);
}
else
{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
index c2aef3d25d..60ff529203 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
@@ -56,4 +56,6 @@ public interface EnvironmentFacade
void close();
+ String getStoreLocation();
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
index 8b4738b4d3..b784e436b9 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
@@ -25,6 +25,8 @@ import org.apache.qpid.server.model.VirtualHost;
public interface EnvironmentFacadeFactory
{
- EnvironmentFacade createEnvironmentFacade(String storeLocation, VirtualHost virtualHost);
+ EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore);
+
+ String getType();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java
new file mode 100644
index 0000000000..b13766a136
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.log4j.Logger;
+
+import com.sleepycat.je.ExceptionEvent;
+import com.sleepycat.je.ExceptionListener;
+
+public class LoggingAsyncExceptionListener implements ExceptionListener
+{
+ private static final Logger LOGGER = Logger.getLogger(LoggingAsyncExceptionListener.class);
+
+ @Override
+ public void exceptionThrown(ExceptionEvent event)
+ {
+ LOGGER.error("Asynchronous exception thrown by BDB thread '" + event.getThreadName() + "'", event.getException());
+ }
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index 4bf228c478..3b6eef832b 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -32,21 +32,35 @@ import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.ExceptionEvent;
-import com.sleepycat.je.ExceptionListener;
public class StandardEnvironmentFacade implements EnvironmentFacade
{
private static final Logger LOGGER = Logger.getLogger(StandardEnvironmentFacade.class);
public static final String TYPE = "BDB";
- private Environment _environment;
+ private final String _storePath;
private final Map<String, Database> _databases = new HashMap<String, Database>();
+ private Environment _environment;
+
public StandardEnvironmentFacade(String storePath, Map<String, String> attributes)
{
+ _storePath = storePath;
- LOGGER.info("BDB message store using environment path " + storePath);
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Creating environment at environment path " + _storePath);
+ }
+
+ File environmentPath = new File(storePath);
+ if (!environmentPath.exists())
+ {
+ if (!environmentPath.mkdirs())
+ {
+ throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
+ + "Ensure the path is correct and that the permissions are correct.");
+ }
+ }
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
@@ -60,26 +74,9 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
- try
- {
- _environment = new Environment(new File(storePath), envConfig);
- }
- catch (DatabaseException de)
- {
- if (de.getMessage().contains("Environment.setAllowCreate is false"))
- {
- // Allow the creation this time
- envConfig.setAllowCreate(true);
- _environment = new Environment(new File(storePath), envConfig);
- }
- else
- {
- throw de;
- }
- }
+ _environment = new Environment(environmentPath, envConfig);
}
-
@Override
public void commit(com.sleepycat.je.Transaction tx) throws AMQStoreException
{
@@ -196,15 +193,6 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
return new AMQStoreException(contextMessage, e);
}
- private class LoggingAsyncExceptionListener implements ExceptionListener
- {
- @Override
- public void exceptionThrown(ExceptionEvent event)
- {
- LOGGER.error("Asynchronous exception thrown by BDB thread '" + event.getThreadName() + "'", event.getException());
- }
- }
-
@Override
public void openDatabases(DatabaseConfig dbConfig, String... databaseNames)
{
@@ -232,4 +220,10 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
return new CoalescingCommiter(name, this);
}
+ @Override
+ public String getStoreLocation()
+ {
+ return _storePath;
+ }
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
index 7ffee19d78..384ceba98a 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
@@ -20,9 +20,11 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import java.io.File;
import java.util.HashMap;
import java.util.Map;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.model.VirtualHost;
public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory
@@ -30,17 +32,45 @@ public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactor
@SuppressWarnings("unchecked")
@Override
- public EnvironmentFacade createEnvironmentFacade(String storePath, VirtualHost virtualHost)
+ public EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore)
{
Map<String, String> envConfigMap = new HashMap<String, String>();
envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS);
- Object bdbEnvConfigAttr = virtualHost.getAttribute("bdbEnvironmentConfig");
- if (bdbEnvConfigAttr instanceof Map)
+ Object environmentConfigurationAttributes = virtualHost.getAttribute(BDBMessageStore.ENVIRONMENT_CONFIGURATION);
+ if (environmentConfigurationAttributes instanceof Map)
{
- envConfigMap.putAll((Map<String, String>) bdbEnvConfigAttr);
+ envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes);
}
- return new StandardEnvironmentFacade(storePath, envConfigMap);
+
+ String name = virtualHost.getName();
+ final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + name;
+
+ String storeLocation;
+ if(isMessageStore)
+ {
+ storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+ if(storeLocation == null)
+ {
+ storeLocation = defaultPath;
+ }
+ }
+ else // we are acting only as the durable config store
+ {
+ storeLocation = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH);
+ if(storeLocation == null)
+ {
+ storeLocation = defaultPath;
+ }
+ }
+
+ return new StandardEnvironmentFacade(storeLocation, envConfigMap);
+ }
+
+ @Override
+ public String getType()
+ {
+ return StandardEnvironmentFacade.TYPE;
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
index ca3e858f01..38fdf34196 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
@@ -25,7 +25,6 @@ import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.LongBinding;
import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Transaction;
@@ -35,7 +34,6 @@ public class DatabasePinger
public static final String PING_DATABASE_NAME = "PINGDB";
private static final int ID = 0;
-
public void pingDb(EnvironmentFacade facade)
{
try
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java
index b4a83e416e..556bcc54c3 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java
@@ -37,7 +37,6 @@ import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.adapter.AbstractAdapter;
import org.apache.qpid.server.model.adapter.NoStatistics;
-import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.util.ParameterizedTypeImpl;
@@ -110,6 +109,11 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication
{
throw new IllegalConfigurationException("Helper host and port attribute is not specified");
}
+ Object storePath = attributes.get(STORE_PATH);
+ if (storePath == null || storePath.equals(""))
+ {
+ throw new IllegalConfigurationException("Store path is not specified for the replication node");
+ }
return attributes;
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 2c8d7acc06..6c4f33364a 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.store.berkeleydb.replication;
import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC;
import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY;
@@ -28,6 +28,7 @@ import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT;
import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT;
import static org.apache.qpid.server.model.ReplicationNode.PARAMETERS;
import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS;
+import static org.apache.qpid.server.model.ReplicationNode.STORE_PATH;
import java.io.File;
import java.net.InetSocketAddress;
@@ -53,9 +54,10 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.replication.ReplicationGroupListener;
-import org.apache.qpid.server.store.berkeleydb.replication.DatabasePinger;
-import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode;
-import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory;
+import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
+import org.apache.qpid.server.store.berkeleydb.Committer;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener;
import org.apache.qpid.server.util.DaemonThreadFactory;
import com.sleepycat.je.Database;
@@ -65,8 +67,6 @@ import com.sleepycat.je.Durability;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.EnvironmentFailureException;
-import com.sleepycat.je.ExceptionEvent;
-import com.sleepycat.je.ExceptionListener;
import com.sleepycat.je.OperationFailureException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.rep.InsufficientLogException;
@@ -81,6 +81,7 @@ import com.sleepycat.je.rep.ReplicationNode;
import com.sleepycat.je.rep.StateChangeEvent;
import com.sleepycat.je.rep.StateChangeListener;
import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
+import com.sleepycat.je.utilint.PropUtil;
public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener
{
@@ -141,12 +142,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private final Durability _durability;
private final boolean _designatedPrimary;
private final boolean _coalescingSync;
- private final String _environmentPath;
+ private final File _environmentDirectory;
private final Map<String, String> _environmentParameters;
private final Map<String, String> _replicationEnvironmentParameters;
private final ExecutorService _restartEnvironmentExecutor;
private final ScheduledExecutorService _groupChangeExecutor;
- private final AtomicReference<State> _state = new AtomicReference<State>(State.INITIAL);
+ private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING);
private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>();
private final ConcurrentMap<String, RemoteReplicationNode> _remoteReplicationNodes = new ConcurrentHashMap<String, RemoteReplicationNode>();
private final RemoteReplicationNodeFactory _remoteReplicationNodeFactory;
@@ -158,10 +159,19 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private String _lastKnownReplicationTransactionId;
@SuppressWarnings("unchecked")
- public ReplicatedEnvironmentFacade(String environmentPath, org.apache.qpid.server.model.ReplicationNode replicationNode,
+ public ReplicatedEnvironmentFacade(org.apache.qpid.server.model.ReplicationNode replicationNode,
RemoteReplicationNodeFactory remoteReplicationNodeFactory)
{
- _environmentPath = environmentPath;
+ _environmentDirectory = new File((String)replicationNode.getAttribute(STORE_PATH));
+ if (!_environmentDirectory.exists())
+ {
+ if (!_environmentDirectory.mkdirs())
+ {
+ throw new IllegalArgumentException("Environment path " + _environmentDirectory + " could not be read or created. "
+ + "Ensure the path is correct and that the permissions are correct.");
+ }
+ }
+
_groupName = (String)replicationNode.getAttribute(GROUP_NAME);
_nodeName = replicationNode.getName();
_nodeHostPort = (String)replicationNode.getAttribute(HOST_PORT);;
@@ -177,43 +187,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
_groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
_remoteReplicationNodeFactory = remoteReplicationNodeFactory;
- _state.set(State.OPENING);
_groupChangeExecutor.scheduleWithFixedDelay(new GroupChangeLearner(), 0, GROUP_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
_groupChangeExecutor.schedule(new RemoteNodeStateLearner(), _remoteReplicationNodeFactory.getRemoteNodeMonitorInterval(), TimeUnit.MILLISECONDS);
// create environment in a separate thread to avoid renaming of the current thread by JE
- Future<ReplicatedEnvironment> environmentFuture = _restartEnvironmentExecutor.submit(new Callable<ReplicatedEnvironment>(){
- @Override
- public ReplicatedEnvironment call() throws Exception
- {
- String originalThreadName = Thread.currentThread().getName();
- try
- {
- return createEnvironment();
- }
- finally
- {
- Thread.currentThread().setName(originalThreadName);
- }
- }});
-
- // TODO: evaluate the future timeout from JE ENVIRONMENT_SETUP
- try
- {
- _environment = environmentFuture.get(15 * 2, TimeUnit.MINUTES);
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException("Unexpected exception on environment creation", e.getCause());
- }
- catch (TimeoutException e)
- {
- throw new RuntimeException("JE environment has not been created in due time");
- }
+ _environment = createEnvironment(true);
populateExistingRemoteReplicationNodes();
}
@@ -235,12 +213,17 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
@Override
public void close()
{
- if (_state.compareAndSet(State.INITIAL, State.CLOSING) || _state.compareAndSet(State.OPENING, State.CLOSING) ||
- _state.compareAndSet(State.OPEN, State.CLOSING) || _state.compareAndSet(State.RESTARTING, State.CLOSING) )
+ if (_state.compareAndSet(State.OPENING, State.CLOSING) ||
+ _state.compareAndSet(State.OPEN, State.CLOSING) ||
+ _state.compareAndSet(State.RESTARTING, State.CLOSING) )
{
try
{
- LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName);
+ }
+
_restartEnvironmentExecutor.shutdown();
_groupChangeExecutor.shutdown();
closeDatabases();
@@ -321,7 +304,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private void openDatabaseInternally(String databaseName, DatabaseHolder holder)
{
- LOGGER.debug("Opening database " + databaseName + " on " + _prettyGroupNodeName);
Database database = _environment.openDatabase(null, databaseName, holder.getConfig());
holder.setDatabase(database);
}
@@ -352,6 +334,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
@Override
+ public String getStoreLocation()
+ {
+ return _environmentDirectory.getAbsolutePath();
+ }
+
+ @Override
public void stateChange(final StateChangeEvent stateChangeEvent)
{
_groupChangeExecutor.submit(new Runnable()
@@ -381,7 +369,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
reopenDatabases();
StateChangeListener listener = _stateChangeListener.get();
- LOGGER.debug("Application state change listener " + listener);
+
if (listener != null)
{
listener.stateChange(stateChangeEvent);
@@ -652,7 +640,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
closeEnvironmentSafely();
- _environment = createEnvironment();
+ _environment = createEnvironment(false);
if (_stateChangeListener.get() != null)
{
@@ -692,7 +680,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private void closeDatabases()
{
RuntimeException firstThrownException = null;
- LOGGER.debug("Closing databases " + _databases);
for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet())
{
DatabaseHolder databaseHolder = entry.getValue();
@@ -701,7 +688,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
try
{
- LOGGER.debug("Closing database " + entry.getKey() + " on " + _prettyGroupNodeName);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Closing database " + entry.getKey() + " on " + _prettyGroupNodeName);
+ }
+
database.close();
}
catch(RuntimeException e)
@@ -724,12 +715,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
- private ReplicatedEnvironment createEnvironment()
+ private ReplicatedEnvironment createEnvironment(boolean createEnvironmentInSeparateThread)
{
if (LOGGER.isInfoEnabled())
{
LOGGER.info("Creating environment");
- LOGGER.info("Environment path " + _environmentPath);
+ LOGGER.info("Environment path " + _environmentDirectory.getAbsolutePath());
LOGGER.info("Group name " + _groupName);
LOGGER.info("Node name " + _nodeName);
LOGGER.info("Node host port " + _nodeHostPort);
@@ -750,7 +741,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
environmentSettings.putAll(_environmentParameters);
}
- final ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort);
+ ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort);
replicationConfig.setHelperHosts(_helperHostPort);
replicationConfig.setDesignatedPrimary(_designatedPrimary);
@@ -778,8 +769,58 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
envConfig.setConfigParam(configItem.getKey(), configItem.getValue());
}
+ if (createEnvironmentInSeparateThread)
+ {
+ return createEnvironmentInSeparateThread(_environmentDirectory, envConfig, replicationConfig);
+ }
+ else
+ {
+ return createEnvironment(_environmentDirectory, envConfig, replicationConfig);
+ }
+ }
+
+ private ReplicatedEnvironment createEnvironmentInSeparateThread(final File environmentPathFile, final EnvironmentConfig envConfig,
+ final ReplicationConfig replicationConfig)
+ {
+ Future<ReplicatedEnvironment> environmentFuture = _restartEnvironmentExecutor.submit(new Callable<ReplicatedEnvironment>(){
+ @Override
+ public ReplicatedEnvironment call() throws Exception
+ {
+ String originalThreadName = Thread.currentThread().getName();
+ try
+ {
+ return createEnvironment(environmentPathFile, envConfig, replicationConfig);
+ }
+ finally
+ {
+ Thread.currentThread().setName(originalThreadName);
+ }
+ }});
+
+ long setUpTimeOutMillis = PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT));
+ try
+ {
+ return environmentFuture.get(setUpTimeOutMillis, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Environment creation was interrupted", e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException("Unexpected exception on environment creation", e.getCause());
+ }
+ catch (TimeoutException e)
+ {
+ throw new RuntimeException("JE environment has not been created in due time");
+ }
+ }
+
+ private ReplicatedEnvironment createEnvironment(File environmentPathFile, EnvironmentConfig envConfig,
+ final ReplicationConfig replicationConfig)
+ {
ReplicatedEnvironment environment = null;
- File environmentPathFile = new File(_environmentPath);
try
{
environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
@@ -875,7 +916,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
- //TODO: move the class into external class
private class RemoteNodeStateLearner implements Callable<Void>
{
private Map<String, String> _previousGroupState = Collections.emptyMap();
@@ -944,18 +984,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
- private class LoggingAsyncExceptionListener implements ExceptionListener
- {
- @Override
- public void exceptionThrown(ExceptionEvent event)
- {
- LOGGER.error("Asynchronous exception thrown by BDB thread '" + event.getThreadName() + "'", event.getException());
- }
- }
-
public static enum State
{
- INITIAL, // TODO unused remove
OPENING,
OPEN,
RESTARTING,
@@ -995,4 +1025,5 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
index 324b7b647a..0ddd7134ac 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.store.berkeleydb.replication;
import java.util.Collection;
import java.util.HashMap;
@@ -27,9 +27,8 @@ import java.util.Map;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.ReplicationNode;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.berkeleydb.replication.LocalReplicationNode;
-import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode;
-import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory;
import com.sleepycat.je.Durability;
import com.sleepycat.je.Durability.SyncPolicy;
@@ -38,7 +37,7 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
{
@Override
- public EnvironmentFacade createEnvironmentFacade(String storeLocation, VirtualHost virtualHost)
+ public EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore)
{
Collection<ReplicationNode> replicationNodes = virtualHost.getChildren(ReplicationNode.class);
if (replicationNodes == null || replicationNodes.size() != 1)
@@ -59,7 +58,7 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
+ "! Please set highAvailability.coalescingSync to false in store configuration.");
}
- ReplicatedEnvironmentFacade facade = new ReplicatedEnvironmentFacade(storeLocation, localNode, new RemoteReplicationNodeFactoryImpl(virtualHost));
+ ReplicatedEnvironmentFacade facade = new ReplicatedEnvironmentFacade(localNode, new RemoteReplicationNodeFactoryImpl(virtualHost));
((LocalReplicationNode)localNode).setReplicatedEnvironmentFacade(facade);
return facade;
}
@@ -89,4 +88,10 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
return (Long)_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL);
}
}
+
+ @Override
+ public String getType()
+ {
+ return ReplicatedEnvironmentFacade.TYPE;
+ }
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
index 4684358190..7a645a6932 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
@@ -65,7 +65,7 @@ public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestB
_logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE);
Map<String,String> envMap = Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE);
- when(virtualHost.getAttribute(eq("bdbEnvironmentConfig"))).thenReturn(envMap);
+ when(virtualHost.getAttribute(eq(BDBMessageStore.ENVIRONMENT_CONFIGURATION))).thenReturn(envMap);
when(virtualHost.getAttribute(eq(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE))).thenReturn(OVERFULL_SIZE);
when(virtualHost.getAttribute(eq(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE))).thenReturn(UNDERFULL_SIZE);
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java
index 730001d849..48fb180984 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java
@@ -22,20 +22,15 @@ package org.apache.qpid.server.store.berkeleydb;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreCreator;
-import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
import org.apache.qpid.test.utils.QpidTestCase;
public class MessageStoreCreatorTest extends QpidTestCase
{
- private static final String[] STORE_TYPES = {BDBMessageStore.TYPE};
-
public void testMessageStoreCreator()
{
MessageStoreCreator messageStoreCreator = new MessageStoreCreator();
- for (String type : STORE_TYPES)
- {
- MessageStore store = messageStoreCreator.createMessageStore(type);
- assertNotNull("Store of type " + type + " is not created", store);
- }
+ String type = new BDBMessageStoreFactory().getType();
+ MessageStore store = messageStoreCreator.createMessageStore(type);
+ assertNotNull("Store of type " + type + " is not created", store);
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
index 5bad51fbc3..b19e18b204 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
@@ -24,7 +24,6 @@ import java.io.File;
import java.util.Collections;
import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.test.utils.TestFileUtils;
import org.apache.qpid.util.FileUtils;
import com.sleepycat.je.Database;
@@ -39,7 +38,7 @@ public class StandardEnvironmentFacadeTest extends QpidTestCase
protected void setUp() throws Exception
{
super.setUp();
- _storePath = TestFileUtils.createTestDirectory("bdb", true);
+ _storePath = new File(TMP_FOLDER + File.separator + "bdb" + File.separator + getTestName());
}
protected void tearDown() throws Exception
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
index 97fd927f01..7269988042 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
@@ -46,6 +46,7 @@ import org.apache.qpid.server.model.ReplicationNode;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -185,7 +186,7 @@ public class VirtualHostTest extends QpidTestCase
_host.setDesiredState(State.INITIALISING, State.ACTIVE);
assertEquals("Unexpected host name", hostName, _host.getName());
assertEquals("Unexpected host type", StandardVirtualHostFactory.TYPE, _host.getType());
- assertEquals("Unexpected store type", BDBMessageStore.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE));
+ assertEquals("Unexpected store type", new BDBMessageStoreFactory().getType(), _host.getAttribute(VirtualHost.STORE_TYPE));
assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH));
BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore();
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java
index 138b4bc398..330abce5cf 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java
@@ -32,7 +32,6 @@ import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.ReplicationNode;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade;
import org.apache.qpid.test.utils.QpidTestCase;
public class LocalReplicationNodeTest extends QpidTestCase
@@ -184,6 +183,7 @@ public class LocalReplicationNodeTest extends QpidTestCase
attributes.put(ReplicationNode.GROUP_NAME, "testGroup");
attributes.put(ReplicationNode.HOST_PORT, "localhost:5000");
attributes.put(ReplicationNode.HELPER_HOST_PORT, "localhost:5001");
+ attributes.put(ReplicationNode.STORE_PATH, TMP_FOLDER + File.separator + getTestName());
return attributes;
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopReplicationGroupListener.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/NoopReplicationGroupListener.java
index f4bb79596a..21c902ae8f 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopReplicationGroupListener.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/NoopReplicationGroupListener.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.store.berkeleydb.replication;
import org.apache.qpid.server.model.ReplicationNode;
import org.apache.qpid.server.replication.ReplicationGroupListener;
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index 806c108acd..cea7d52d43 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.store.berkeleydb.replication;
import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC;
import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY;
@@ -28,6 +28,7 @@ import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT;
import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT;
import static org.apache.qpid.server.model.ReplicationNode.NAME;
import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS;
+import static org.apache.qpid.server.model.ReplicationNode.STORE_PATH;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -47,8 +48,11 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.model.ReplicationNode;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.replication.ReplicationGroupListener;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode;
import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.test.utils.TestFileUtils;
import org.apache.qpid.util.FileUtils;
@@ -65,7 +69,6 @@ import com.sleepycat.je.rep.StateChangeListener;
public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
{
- protected File _storePath;
private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort();
private static final int LISTENER_TIMEOUT = 5;
private static final int WAIT_STATE_CHANGE_TIMEOUT = 30;
@@ -76,6 +79,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString();
private static final boolean TEST_DESIGNATED_PRIMARY = false;
private static final boolean TEST_COALESCING_SYNC = true;
+
+ private File _storePath;
private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
private VirtualHost _virtualHost = mock(VirtualHost.class);
private RemoteReplicationNodeFactory _remoteReplicationNodeFactory = new ReplicatedEnvironmentFacadeFactory.RemoteReplicationNodeFactoryImpl(_virtualHost);
@@ -540,24 +545,12 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertTrue("Replica " + nodeName + " was not started", testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
return replicaEnvironmentFacade;
}
-
- private String createNodeWorkingFolder(String nodeName)
- {
- File nodeLocation = new File(_storePath, nodeName);
- if (!nodeLocation.exists())
- {
- nodeLocation.mkdirs();
- }
- final String nodePath = nodeLocation.getAbsolutePath();
- return nodePath;
- }
-
private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary,
State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener)
{
- final String nodePath = createNodeWorkingFolder(nodeName);
+
ReplicationNode node = createReplicationNodeMock(nodeName, nodeHostPort, designatedPrimary);
- ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(nodePath, node, _remoteReplicationNodeFactory);
+ ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(node, _remoteReplicationNodeFactory);
ref.setReplicationGroupListener(replicationGroupListener);
ref.setStateChangeListener(stateChangeListener);
_nodes.put(nodeName, ref);
@@ -594,6 +587,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
when(node.getAttribute(REPLICATION_PARAMETERS)).thenReturn(repConfig);
+
+ when(node.getAttribute(STORE_PATH)).thenReturn(new File(_storePath, nodeName).getAbsolutePath());
return node;
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/TestStateChangeListener.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java
index 5b5a507c1f..1e244e1f89 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/TestStateChangeListener.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.store.berkeleydb.replication;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
index c190498a36..6bddf5876f 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
@@ -38,6 +38,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
index f8b0fb362c..d7b27baa4e 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
@@ -19,6 +19,7 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
@@ -41,6 +42,7 @@ import javax.jms.Connection;
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.model.ReplicationNode;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -69,7 +71,7 @@ public class HATestClusterCreator
private final String _virtualHostName;
private final String _ipAddressOfBroker;
- private final String _groupName ;
+ private final String _groupName;
private final int _numberOfNodes;
private int _bdbHelperPort;
private int _primaryBrokerPort;
@@ -350,6 +352,7 @@ public class HATestClusterCreator
{
replicationNodeAttributes.put(ReplicationNode.REPLICATION_PARAMETERS, replicationParameters);
}
+ replicationNodeAttributes.put(ReplicationNode.STORE_PATH, System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + nodeName);
// ha virtual host
Map<String, Object> virtualHostAttributes = new HashMap<String, Object>();