From c85cc192b9582d103cec0381b7e91d459e1db00c Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Wed, 22 Jan 2014 16:08:18 +0000 Subject: QPID-5409: Refactring of bdb ha replication functionality and fixing tests git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1560400 13f79535-47bb-0310-9956-ffa450edef68 --- .../jmx/BDBHAMessageStoreManagerMBean.java | 2 +- .../jmx/BDBHAMessageStoreManagerMBeanProvider.java | 2 +- .../jmx/BDBHAMessageStoreManagerMBeanTest.java | 2 +- .../qpid/server/store/berkeleydb/BDBBackup.java | 14 - .../server/store/berkeleydb/BDBHAVirtualHost.java | 11 +- .../store/berkeleydb/BDBHAVirtualHostFactory.java | 3 +- .../server/store/berkeleydb/BDBMessageStore.java | 71 +- .../store/berkeleydb/BDBMessageStoreFactory.java | 5 +- .../server/store/berkeleydb/EnvironmentFacade.java | 2 + .../store/berkeleydb/EnvironmentFacadeFactory.java | 4 +- .../berkeleydb/LoggingAsyncExceptionListener.java | 37 + .../berkeleydb/ReplicatedEnvironmentFacade.java | 998 ------------------- .../ReplicatedEnvironmentFacadeFactory.java | 92 -- .../berkeleydb/StandardEnvironmentFacade.java | 56 +- .../StandardEnvironmentFacadeFactory.java | 40 +- .../berkeleydb/replication/DatabasePinger.java | 2 - .../replication/LocalReplicationNode.java | 6 +- .../replication/ReplicatedEnvironmentFacade.java | 1029 ++++++++++++++++++++ .../ReplicatedEnvironmentFacadeFactory.java | 97 ++ .../berkeleydb/BDBMessageStoreQuotaEventsTest.java | 2 +- .../store/berkeleydb/MessageStoreCreatorTest.java | 11 +- .../berkeleydb/NoopReplicationGroupListener.java | 42 - .../ReplicatedEnvironmentFacadeTest.java | 599 ------------ .../berkeleydb/StandardEnvironmentFacadeTest.java | 3 +- .../store/berkeleydb/TestStateChangeListener.java | 54 - .../server/store/berkeleydb/VirtualHostTest.java | 3 +- .../replication/LocalReplicationNodeTest.java | 2 +- .../replication/NoopReplicationGroupListener.java | 42 + .../ReplicatedEnvironmentFacadeTest.java | 594 +++++++++++ .../replication/TestStateChangeListener.java | 54 + .../store/berkeleydb/HAClusterManagementTest.java | 1 + .../store/berkeleydb/HATestClusterCreator.java | 5 +- 32 files changed, 1969 insertions(+), 1916 deletions(-) create mode 100644 qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java delete mode 100644 qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java delete mode 100644 qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java create mode 100644 qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java create mode 100644 qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java delete mode 100644 qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopReplicationGroupListener.java delete mode 100644 qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java delete mode 100644 qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/TestStateChangeListener.java create mode 100644 qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/NoopReplicationGroupListener.java create mode 100644 qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java create mode 100644 qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java (limited to 'qpid/java') diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java index 8351089bde..757949cf61 100644 --- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java @@ -39,7 +39,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.jmx.AMQManagedObject; import org.apache.qpid.server.jmx.ManagedObject; -import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; /** * Management mbean for BDB HA. diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java index 62c11a307c..16199d30a3 100644 --- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java @@ -29,7 +29,7 @@ import org.apache.qpid.server.jmx.ManagedObject; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; -import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; /** * This provide will create a {@link BDBHAMessageStoreManagerMBean} if the child is a virtual diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java index f8c330450c..154c25e4dd 100644 --- a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java +++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java @@ -43,9 +43,9 @@ import org.apache.qpid.server.jmx.ManagedObjectRegistry; import org.apache.qpid.server.logging.SystemOutMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.TestLogActor; -import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.jmx.BDBHAMessageStoreManagerMBean; import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; public class BDBHAMessageStoreManagerMBeanTest extends TestCase { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java index 9b97fec479..5a498470fb 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java @@ -22,7 +22,6 @@ package org.apache.qpid.server.store.berkeleydb; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.util.DbBackup; import org.apache.log4j.Logger; @@ -336,17 +335,4 @@ public class BDBBackup return backedUpFileNames.toArray(new String[backedUpFileNames.size()]); } - /* - * Creates an environment for the bdb log files in the specified directory. This envrinonment can only be used - * to backup these files, if they are not locked by another database instance. - * - * @param fromdir The path to the directory to create the environment for. - * - * @throws DatabaseException Any underlying exceptions from BDB are allowed to fall through. - */ - private Environment createSourceDirEnvironment(String fromdir) throws DatabaseException - { - // Initialize the BDB backup utility on the source directory. - return new Environment(new File(fromdir), new EnvironmentConfig()); - } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index 8d29e89472..17e04707fc 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -20,16 +20,9 @@ package org.apache.qpid.server.store.berkeleydb; * */ -import java.util.concurrent.Callable; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; - import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; -import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.actors.AbstractActor; -import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.replication.ReplicationGroupListener; @@ -40,6 +33,8 @@ import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.OperationalLoggingListener; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; import org.apache.qpid.server.virtualhost.DefaultUpgraderProvider; import org.apache.qpid.server.virtualhost.State; @@ -71,7 +66,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost protected void initialiseStorage(VirtualHostConfiguration hostConfig, VirtualHost virtualHost) throws Exception { - _messageStore = new BDBMessageStore(ReplicatedEnvironmentFacade.TYPE, new ReplicatedEnvironmentFacadeFactory()); + _messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory()); final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName()); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java index 94a535ed0e..48d863530a 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java @@ -83,7 +83,6 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory { LinkedHashMap convertedMap = new LinkedHashMap(); Configuration storeConfiguration = configuration.subset("store"); - convertedMap.put(org.apache.qpid.server.model.VirtualHost.STORE_PATH, storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY)); convertedMap.put(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.OVERFULL_SIZE_PROPERTY)); convertedMap.put(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY)); return convertedMap; @@ -119,6 +118,8 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory attributes.put(ReplicationNode.GROUP_NAME, groupName); attributes.put(ReplicationNode.HOST_PORT, storeConfiguration.getString("highAvailability.nodeHostPort")); attributes.put(ReplicationNode.HELPER_HOST_PORT, storeConfiguration.getString("highAvailability.helperHostPort")); + attributes.put(org.apache.qpid.server.model.VirtualHost.STORE_PATH, + storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY)); String durability = storeConfiguration.getString("highAvailability.durability"); if (durability != null) diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 377ccd7a24..d69a1f9201 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -25,6 +25,7 @@ import com.sleepycat.bind.tuple.IntegerBinding; import com.sleepycat.bind.tuple.LongBinding; import com.sleepycat.je.*; import com.sleepycat.je.Transaction; + import java.io.File; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; @@ -37,6 +38,7 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.message.EnqueableMessage; @@ -71,7 +73,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class); public static final int VERSION = 7; - public static final String TYPE = "BDB"; + public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig"; private static final int LOCK_RETRY_ATTEMPTS = 5; private static String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS"; @@ -104,7 +106,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private long _persistentSizeHighThreshold; private final EventManager _eventManager = new EventManager(); - private String _storeLocation; private final String _type; private VirtualHost _virtualHost; @@ -114,12 +115,12 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore public BDBMessageStore() { - this(TYPE, new StandardEnvironmentFacadeFactory()); + this(new StandardEnvironmentFacadeFactory()); } - public BDBMessageStore(String type, EnvironmentFacadeFactory environmentFacadeFactory) + public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory) { - _type = type; + _type = environmentFacadeFactory.getType();; _environmentFacadeFactory = environmentFacadeFactory; _stateManager = new StateManager(_eventManager); } @@ -218,27 +219,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private void configure(VirtualHost virtualHost, boolean isMessageStore) throws AMQStoreException { - String name = virtualHost.getName(); - final String defaultPath = System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name; - - String storeLocation; - if(isMessageStore) - { - storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); - if(storeLocation == null) - { - storeLocation = defaultPath; - } - } - else // we are acting only as the durable config store - { - storeLocation = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH); - if(storeLocation == null) - { - storeLocation = defaultPath; - } - } - Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); @@ -253,29 +233,20 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore _persistentSizeLowThreshold = _persistentSizeHighThreshold; } - File environmentPath = new File(storeLocation); - if (!environmentPath.exists()) - { - if (!environmentPath.mkdirs()) - { - throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " - + "Ensure the path is correct and that the permissions are correct."); - } - } - - _storeLocation = storeLocation; - - LOGGER.info("Setting up environment"); - _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeLocation, virtualHost); + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHost, isMessageStore); - _committer = _environmentFacade.createCommitter(null); + _committer = _environmentFacade.createCommitter(virtualHost.getName()); _committer.start(); } @Override public String getStoreLocation() { - return _storeLocation; + if (_environmentFacade == null) + { + return null; + } + return _environmentFacade.getStoreLocation(); } public EnvironmentFacade getEnvironmentFacade() @@ -1695,19 +1666,21 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore @Override public void onDelete() { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Deleting store " + _storeLocation); - } + String storeLocation = getStoreLocation(); - if (_storeLocation != null) + if (storeLocation != null) { - File location = new File(_storeLocation); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Deleting store " + storeLocation); + } + + File location = new File(storeLocation); if (location.exists()) { if (!FileUtils.delete(location, true)) { - LOGGER.error("Cannot delete " + _storeLocation); + LOGGER.error("Cannot delete " + storeLocation); } } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java index d7c8b23d39..4abe81c56c 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; + import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory; @@ -37,7 +38,7 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi @Override public String getType() { - return BDBMessageStore.TYPE; + return StandardEnvironmentFacade.TYPE; } @Override @@ -71,7 +72,7 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi if(initialSize != 0) { - return Collections.singletonMap("bdbEnvironmentConfig", (Object)attributes); + return Collections.singletonMap(BDBMessageStore.ENVIRONMENT_CONFIGURATION, (Object)attributes); } else { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java index c2aef3d25d..60ff529203 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java @@ -56,4 +56,6 @@ public interface EnvironmentFacade void close(); + String getStoreLocation(); + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java index 8b4738b4d3..b784e436b9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java @@ -25,6 +25,8 @@ import org.apache.qpid.server.model.VirtualHost; public interface EnvironmentFacadeFactory { - EnvironmentFacade createEnvironmentFacade(String storeLocation, VirtualHost virtualHost); + EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore); + + String getType(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java new file mode 100644 index 0000000000..b13766a136 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.log4j.Logger; + +import com.sleepycat.je.ExceptionEvent; +import com.sleepycat.je.ExceptionListener; + +public class LoggingAsyncExceptionListener implements ExceptionListener +{ + private static final Logger LOGGER = Logger.getLogger(LoggingAsyncExceptionListener.class); + + @Override + public void exceptionThrown(ExceptionEvent event) + { + LOGGER.error("Asynchronous exception thrown by BDB thread '" + event.getThreadName() + "'", event.getException()); + } +} \ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java deleted file mode 100644 index 2c8d7acc06..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java +++ /dev/null @@ -1,998 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC; -import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY; -import static org.apache.qpid.server.model.ReplicationNode.DURABILITY; -import static org.apache.qpid.server.model.ReplicationNode.GROUP_NAME; -import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT; -import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT; -import static org.apache.qpid.server.model.ReplicationNode.PARAMETERS; -import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS; - -import java.io.File; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.server.replication.ReplicationGroupListener; -import org.apache.qpid.server.store.berkeleydb.replication.DatabasePinger; -import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode; -import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory; -import org.apache.qpid.server.util.DaemonThreadFactory; - -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.DatabaseException; -import com.sleepycat.je.Durability; -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.EnvironmentFailureException; -import com.sleepycat.je.ExceptionEvent; -import com.sleepycat.je.ExceptionListener; -import com.sleepycat.je.OperationFailureException; -import com.sleepycat.je.Transaction; -import com.sleepycat.je.rep.InsufficientLogException; -import com.sleepycat.je.rep.InsufficientReplicasException; -import com.sleepycat.je.rep.NetworkRestore; -import com.sleepycat.je.rep.NetworkRestoreConfig; -import com.sleepycat.je.rep.ReplicatedEnvironment; -import com.sleepycat.je.rep.ReplicationConfig; -import com.sleepycat.je.rep.ReplicationGroup; -import com.sleepycat.je.rep.ReplicationMutableConfig; -import com.sleepycat.je.rep.ReplicationNode; -import com.sleepycat.je.rep.StateChangeEvent; -import com.sleepycat.je.rep.StateChangeListener; -import com.sleepycat.je.rep.util.ReplicationGroupAdmin; - -public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener -{ - public static final String GROUP_CHECK_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.group_check_interval"; - - private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class); - private static final long DEFAULT_GROUP_CHECK_INTERVAL = 1000l; - private static final long GROUP_CHECK_INTERVAL = Long.getLong(GROUP_CHECK_INTERVAL_PROPERTY_NAME, DEFAULT_GROUP_CHECK_INTERVAL); - - @SuppressWarnings("serial") - private static final Map REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap() - {{ - /** - * Parameter decreased as the 24h default may lead very large log files for most users. - */ - put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h"); - /** - * Parameter increased as the 5 s default may lead to spurious timeouts. - */ - put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s"); - /** - * Parameter increased as the 10 s default may lead to spurious timeouts. - */ - put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s"); - /** - * Parameter increased as the 10 h default may cause user confusion. - */ - put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min"); - /** - * Parameter changed from default true so we adopt immediately adopt the new behaviour early. False - * is scheduled to become default after JE 5.0.48. - */ - put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, Boolean.FALSE.toString()); - /** - * Parameter decreased as a default 5min interval may lead to bigger data losses on Node - * with NO_SYN durability in case if such Node crushes. - */ - put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min"); - - /** - * Timeout to transit into UNKNOWN state if the majority is not available. - * By default it is switched off. - */ - put(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "5 s"); - }}); - - public static final String TYPE = "BDB-HA"; - - // TODO: JMX will change to observe the model, at that point these names will disappear - public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort"; - public static final String GRP_MEM_COL_NODE_NAME = "NodeName"; - - private final String _prettyGroupNodeName; - private final String _groupName; - private final String _nodeName; - private final String _nodeHostPort; - private final String _helperHostPort; - private final Durability _durability; - private final boolean _designatedPrimary; - private final boolean _coalescingSync; - private final String _environmentPath; - private final Map _environmentParameters; - private final Map _replicationEnvironmentParameters; - private final ExecutorService _restartEnvironmentExecutor; - private final ScheduledExecutorService _groupChangeExecutor; - private final AtomicReference _state = new AtomicReference(State.INITIAL); - private final ConcurrentMap _databases = new ConcurrentHashMap(); - private final ConcurrentMap _remoteReplicationNodes = new ConcurrentHashMap(); - private final RemoteReplicationNodeFactory _remoteReplicationNodeFactory; - - private final AtomicReference _replicationGroupListener = new AtomicReference(); - private final AtomicReference _stateChangeListener = new AtomicReference(); - private volatile ReplicatedEnvironment _environment; - private long _joinTime; - private String _lastKnownReplicationTransactionId; - - @SuppressWarnings("unchecked") - public ReplicatedEnvironmentFacade(String environmentPath, org.apache.qpid.server.model.ReplicationNode replicationNode, - RemoteReplicationNodeFactory remoteReplicationNodeFactory) - { - _environmentPath = environmentPath; - _groupName = (String)replicationNode.getAttribute(GROUP_NAME); - _nodeName = replicationNode.getName(); - _nodeHostPort = (String)replicationNode.getAttribute(HOST_PORT);; - _helperHostPort = (String)replicationNode.getAttribute(HELPER_HOST_PORT); - _durability = Durability.parse((String)replicationNode.getAttribute(DURABILITY)); - _designatedPrimary = (Boolean)replicationNode.getAttribute(DESIGNATED_PRIMARY); - _coalescingSync = (Boolean)replicationNode.getAttribute(COALESCING_SYNC); - _environmentParameters = (Map)replicationNode.getAttribute(PARAMETERS); - _replicationEnvironmentParameters = (Map)replicationNode.getAttribute(REPLICATION_PARAMETERS); - _prettyGroupNodeName = _groupName + ":" + _nodeName; - - _restartEnvironmentExecutor = Executors.newFixedThreadPool(1, new DaemonThreadFactory("Environment-Starter:" + _prettyGroupNodeName)); - _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName)); - - _remoteReplicationNodeFactory = remoteReplicationNodeFactory; - _state.set(State.OPENING); - _groupChangeExecutor.scheduleWithFixedDelay(new GroupChangeLearner(), 0, GROUP_CHECK_INTERVAL, TimeUnit.MILLISECONDS); - _groupChangeExecutor.schedule(new RemoteNodeStateLearner(), _remoteReplicationNodeFactory.getRemoteNodeMonitorInterval(), TimeUnit.MILLISECONDS); - - // create environment in a separate thread to avoid renaming of the current thread by JE - Future environmentFuture = _restartEnvironmentExecutor.submit(new Callable(){ - @Override - public ReplicatedEnvironment call() throws Exception - { - String originalThreadName = Thread.currentThread().getName(); - try - { - return createEnvironment(); - } - finally - { - Thread.currentThread().setName(originalThreadName); - } - }}); - - // TODO: evaluate the future timeout from JE ENVIRONMENT_SETUP - try - { - _environment = environmentFuture.get(15 * 2, TimeUnit.MINUTES); - } - catch (InterruptedException e) - { - Thread.currentThread().interrupt(); - } - catch (ExecutionException e) - { - throw new RuntimeException("Unexpected exception on environment creation", e.getCause()); - } - catch (TimeoutException e) - { - throw new RuntimeException("JE environment has not been created in due time"); - } - populateExistingRemoteReplicationNodes(); - } - - @Override - public void commit(final Transaction tx) throws AMQStoreException - { - try - { - // Using commit() instead of commitNoSync() for the HA store to allow - // the HA durability configuration to influence resulting behaviour. - tx.commit(); - } - catch (DatabaseException de) - { - throw handleDatabaseException("Got DatabaseException on commit, closing environment", de); - } - } - - @Override - public void close() - { - if (_state.compareAndSet(State.INITIAL, State.CLOSING) || _state.compareAndSet(State.OPENING, State.CLOSING) || - _state.compareAndSet(State.OPEN, State.CLOSING) || _state.compareAndSet(State.RESTARTING, State.CLOSING) ) - { - try - { - LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName); - _restartEnvironmentExecutor.shutdown(); - _groupChangeExecutor.shutdown(); - closeDatabases(); - closeEnvironment(); - } - finally - { - _state.compareAndSet(State.CLOSING, State.CLOSED); - } - } - } - - @Override - public AMQStoreException handleDatabaseException(String contextMessage, final DatabaseException dbe) - { - boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException); - if (restart) - { - if (_state.compareAndSet(State.OPEN, State.RESTARTING)) - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Environment restarting due to exception " + dbe.getMessage(), dbe); - } - _restartEnvironmentExecutor.execute(new Runnable() - { - @Override - public void run() - { - try - { - restartEnvironment(dbe); - } - catch (Exception e) - { - LOGGER.error("Exception on environment restart", e); - } - } - }); - - } - else - { - LOGGER.info("Cannot restart environment because of facade state: " + _state.get()); - } - } - return new AMQStoreException(contextMessage, dbe); - } - - @Override - public void openDatabases(DatabaseConfig dbConfig, String... databaseNames) - { - if (_state.get() != State.OPEN) - { - throw new IllegalStateException("Environment facade is not in opened state"); - } - - if (!_environment.isValid()) - { - throw new IllegalStateException("Environment is not valid"); - } - - if (_environment.getState() != ReplicatedEnvironment.State.MASTER) - { - throw new IllegalStateException("Databases can only be opened on Master node"); - } - - for (String databaseName : databaseNames) - { - _databases.put(databaseName, new DatabaseHolder(dbConfig)); - } - for (String databaseName : databaseNames) - { - DatabaseHolder holder = _databases.get(databaseName); - openDatabaseInternally(databaseName, holder); - } - } - - private void openDatabaseInternally(String databaseName, DatabaseHolder holder) - { - LOGGER.debug("Opening database " + databaseName + " on " + _prettyGroupNodeName); - Database database = _environment.openDatabase(null, databaseName, holder.getConfig()); - holder.setDatabase(database); - } - - @Override - public Database getOpenDatabase(String name) - { - if (_state.get() != State.OPEN) - { - throw new IllegalStateException("Environment facade is not in opened state"); - } - - if (!_environment.isValid()) - { - throw new IllegalStateException("Environment is not valid"); - } - DatabaseHolder databaseHolder = _databases.get(name); - if (databaseHolder == null) - { - throw new IllegalArgumentException("Database with name '" + name + "' has never been requested to be opened"); - } - Database database = databaseHolder.getDatabase(); - if (database == null) - { - throw new IllegalArgumentException("Database with name '" + name + "' has not been opened"); - } - return database; - } - - @Override - public void stateChange(final StateChangeEvent stateChangeEvent) - { - _groupChangeExecutor.submit(new Runnable() - { - @Override - public void run() - { - stateChanged(stateChangeEvent); - } - }); - } - - private void stateChanged(StateChangeEvent stateChangeEvent) - { - ReplicatedEnvironment.State state = stateChangeEvent.getState(); - LOGGER.info("The node '" + _prettyGroupNodeName + "' state is " + state); - if (state == ReplicatedEnvironment.State.REPLICA || state == ReplicatedEnvironment.State.MASTER) - { - if (_state.compareAndSet(State.OPENING, State.OPEN) || _state.compareAndSet(State.RESTARTING, State.OPEN)) - { - LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName); - _joinTime = System.currentTimeMillis(); - } - } - - if (state == ReplicatedEnvironment.State.MASTER) - { - reopenDatabases(); - StateChangeListener listener = _stateChangeListener.get(); - LOGGER.debug("Application state change listener " + listener); - if (listener != null) - { - listener.stateChange(stateChangeEvent); - } - } - else - { - if (_state.get() != State.CLOSING && _state.get() != State.CLOSED) - { - StateChangeListener listener = _stateChangeListener.get(); - if (listener != null) - { - listener.stateChange(stateChangeEvent); - } - } - } - } - - private void reopenDatabases() - { - DatabaseConfig pingDbConfig = new DatabaseConfig(); - pingDbConfig.setTransactional(true); - pingDbConfig.setAllowCreate(true); - - _databases.putIfAbsent(DatabasePinger.PING_DATABASE_NAME, new DatabaseHolder(pingDbConfig)); - - for (Map.Entry entry : _databases.entrySet()) - { - openDatabaseInternally(entry.getKey(), entry.getValue()); - } - } - - public String getGroupName() - { - return _groupName; - } - - public String getNodeName() - { - return _nodeName; - } - - public String getHostPort() - { - return _nodeHostPort; - } - - public String getHelperHostPort() - { - return _helperHostPort; - } - - public String getDurability() - { - return _durability.toString(); - } - - public boolean isCoalescingSync() - { - return _coalescingSync; - } - - public String getNodeState() - { - ReplicatedEnvironment.State state = _environment.getState(); - return state.toString(); - } - - public boolean isDesignatedPrimary() - { - return _environment.getRepMutableConfig().getDesignatedPrimary(); - } - - public int getQuorumOverride() - { - return _environment.getRepMutableConfig().getElectableGroupSizeOverride(); - } - - public List> getGroupMembers() - { - List> members = new ArrayList>(); - - for (ReplicationNode node : _environment.getGroup().getNodes()) - { - Map nodeMap = new HashMap(); - nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, node.getName()); - nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort()); - members.add(nodeMap); - } - - return members; - } - - public void removeNodeFromGroup(final String nodeName) throws AMQStoreException - { - try - { - createReplicationGroupAdmin().removeMember(nodeName); - } - catch (OperationFailureException ofe) - { - // TODO: I am not sure about the exception handing here - throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + ofe.getMessage(), ofe); - } - catch (DatabaseException e) - { - // TODO: I am not sure about the exception handing here - throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + e.getMessage(), e); - } - } - - public void setDesignatedPrimary(final boolean isPrimary) throws AMQStoreException - { - try - { - final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig(); - final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary); - _environment.setRepMutableConfig(newConfig); - - if (LOGGER.isInfoEnabled()) - { - LOGGER.info("Node " + _prettyGroupNodeName + " successfully set as designated primary for group"); - } - - } - catch (DatabaseException e) - { - // TODO: I am not sure about the exception handing here - throw handleDatabaseException("Cannot set designated primary", e); - } - } - - public void updateAddress(final String nodeName, final String newHostName, final int newPort) throws AMQStoreException - { - try - { - createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort); - } - catch (OperationFailureException ofe) - { - // TODO: I am not sure about the exception handing here - throw new AMQStoreException("Failed to update address for '" + nodeName + "' with new host " + newHostName - + " and new port " + newPort + ". " + ofe.getMessage(), ofe); - } - catch (DatabaseException e) - { - // TODO: I am not sure about the exception handing here - throw handleDatabaseException("Failed to update address for '" + nodeName + "' with new host " + newHostName - + " and new port " + newPort + ". " + e.getMessage(), e); - } - } - - public int getPriority() - { - ReplicationMutableConfig repConfig = _environment.getRepMutableConfig(); - return repConfig.getNodePriority(); - } - - public int getElectableGroupSizeOverride() - { - ReplicationMutableConfig repConfig = _environment.getRepMutableConfig(); - return repConfig.getElectableGroupSizeOverride(); - } - - public long getJoinTime() - { - return _joinTime ; - } - - public String getLastKnownReplicationTransactionId() - { - return _lastKnownReplicationTransactionId; - } - - public ReplicatedEnvironment getEnvironment() - { - return _environment; - } - - public State getFacadeState() - { - return _state.get(); - } - - public void setReplicationGroupListener(ReplicationGroupListener replicationGroupListener) - { - if (_replicationGroupListener.compareAndSet(null, replicationGroupListener)) - { - notifyExistingRemoteReplicationNodes(replicationGroupListener); - } - else - { - throw new IllegalStateException("ReplicationGroupListener is already set on " + _prettyGroupNodeName); - } - } - - public void setStateChangeListener(StateChangeListener stateChangeListener) - { - if (_stateChangeListener.compareAndSet(null, stateChangeListener)) - { - _environment.setStateChangeListener(this); - } - else - { - throw new IllegalStateException("StateChangeListener is already set on " + _prettyGroupNodeName); - } - } - - private void populateExistingRemoteReplicationNodes() - { - ReplicationGroup group = _environment.getGroup(); - Set nodes = new HashSet(group.getElectableNodes()); - String localNodeName = getNodeName(); - - for (ReplicationNode replicationNode : nodes) - { - String discoveredNodeName = replicationNode.getName(); - if (!discoveredNodeName.equals(localNodeName)) - { - RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, group.getName()); - - _remoteReplicationNodes.put(replicationNode.getName(), remoteNode); - } - } - } - - private void notifyExistingRemoteReplicationNodes(ReplicationGroupListener listener) - { - for (org.apache.qpid.server.model.ReplicationNode value : _remoteReplicationNodes.values()) - { - listener.onReplicationNodeRecovered(value); - } - } - - private ReplicationGroupAdmin createReplicationGroupAdmin() - { - final Set helpers = new HashSet(); - helpers.addAll(_environment.getRepConfig().getHelperSockets()); - - final ReplicationConfig repConfig = _environment.getRepConfig(); - helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort())); - - return new ReplicationGroupAdmin(_groupName, helpers); - } - - private void closeEnvironment() - { - // Clean the log before closing. This makes sure it doesn't contain - // redundant data. Closing without doing this means the cleaner may not - // get a chance to finish. - try - { - if (_environment.isValid()) - { - _environment.cleanLog(); - } - } - finally - { - _environment.close(); - _environment = null; - } - } - - private void restartEnvironment(DatabaseException dbe) throws AMQStoreException - { - LOGGER.info("Restarting environment"); - - closeEnvironmentSafely(); - - _environment = createEnvironment(); - - if (_stateChangeListener.get() != null) - { - _environment.setStateChangeListener(this); - } - - LOGGER.info("Environment is restarted"); - } - - private void closeEnvironmentSafely() - { - Environment environment = _environment; - if (environment != null) - { - try - { - if (environment.isValid()) - { - try - { - closeDatabases(); - } - catch(Exception e) - { - LOGGER.warn("Ignoring an exception whilst closing databases", e); - } - } - environment.close(); - } - catch (EnvironmentFailureException efe) - { - LOGGER.warn("Ignoring an exception whilst closing environment", efe); - } - } - } - - private void closeDatabases() - { - RuntimeException firstThrownException = null; - LOGGER.debug("Closing databases " + _databases); - for (Map.Entry entry : _databases.entrySet()) - { - DatabaseHolder databaseHolder = entry.getValue(); - Database database = databaseHolder.getDatabase(); - if (database != null) - { - try - { - LOGGER.debug("Closing database " + entry.getKey() + " on " + _prettyGroupNodeName); - database.close(); - } - catch(RuntimeException e) - { - LOGGER.error("Failed to close database on " + _prettyGroupNodeName, e); - if (firstThrownException == null) - { - firstThrownException = e; - } - } - finally - { - databaseHolder.setDatabase(null); - } - } - } - if (firstThrownException != null) - { - throw firstThrownException; - } - } - - private ReplicatedEnvironment createEnvironment() - { - if (LOGGER.isInfoEnabled()) - { - LOGGER.info("Creating environment"); - LOGGER.info("Environment path " + _environmentPath); - LOGGER.info("Group name " + _groupName); - LOGGER.info("Node name " + _nodeName); - LOGGER.info("Node host port " + _nodeHostPort); - LOGGER.info("Helper host port " + _helperHostPort); - LOGGER.info("Durability " + _durability); - LOGGER.info("Coalescing sync " + _coalescingSync); - LOGGER.info("Designated primary (applicable to 2 node case only) " + _designatedPrimary); - } - - Map replicationEnvironmentSettings = new HashMap(REPCONFIG_DEFAULTS); - if (_replicationEnvironmentParameters != null && !_replicationEnvironmentParameters.isEmpty()) - { - replicationEnvironmentSettings.putAll(_replicationEnvironmentParameters); - } - Map environmentSettings = new HashMap(EnvironmentFacade.ENVCONFIG_DEFAULTS); - if (_environmentParameters != null && !_environmentParameters.isEmpty()) - { - environmentSettings.putAll(_environmentParameters); - } - - final ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort); - replicationConfig.setHelperHosts(_helperHostPort); - replicationConfig.setDesignatedPrimary(_designatedPrimary); - - for (Map.Entry configItem : replicationEnvironmentSettings.entrySet()) - { - if (LOGGER.isInfoEnabled()) - { - LOGGER.info("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); - } - replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue()); - } - - EnvironmentConfig envConfig = new EnvironmentConfig(); - envConfig.setAllowCreate(true); - envConfig.setTransactional(true); - envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); - envConfig.setDurability(_durability); - - for (Map.Entry configItem : environmentSettings.entrySet()) - { - if (LOGGER.isInfoEnabled()) - { - LOGGER.info("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); - } - envConfig.setConfigParam(configItem.getKey(), configItem.getValue()); - } - - ReplicatedEnvironment environment = null; - File environmentPathFile = new File(_environmentPath); - try - { - environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); - } - catch (final InsufficientLogException ile) - { - LOGGER.info("InsufficientLogException thrown and so full network restore required", ile); - NetworkRestore restore = new NetworkRestore(); - NetworkRestoreConfig config = new NetworkRestoreConfig(); - config.setRetainLogFiles(false); - restore.execute(ile, config); - environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); - } - return environment; - } - - @Override - public Committer createCommitter(String name) - { - if (_coalescingSync) - { - return new CoalescingCommiter(name, this); - } - else - { - return Committer.IMMEDIATE_FUTURE_COMMITTER; - } - } - - private final class GroupChangeLearner implements Runnable - { - @Override - public void run() - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Checking for changes in the group " + _groupName); - } - - ReplicatedEnvironment env = _environment; - ReplicationGroupListener replicationGroupListener = _replicationGroupListener.get(); - if (env != null && env.isValid()) - { - ReplicationGroup group = env.getGroup(); - Set nodes = new HashSet(group.getElectableNodes()); - String localNodeName = getNodeName(); - - Map removalMap = new HashMap(_remoteReplicationNodes); - for (ReplicationNode replicationNode : nodes) - { - String discoveredNodeName = replicationNode.getName(); - if (!discoveredNodeName.equals(localNodeName)) - { - if (!_remoteReplicationNodes.containsKey(discoveredNodeName)) - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Remote replication node added '" + replicationNode + "' to '" + _groupName + "'"); - } - - RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, group.getName()); - - _remoteReplicationNodes.put(discoveredNodeName, remoteNode); - if (replicationGroupListener != null) - { - replicationGroupListener.onReplicationNodeAddedToGroup(remoteNode); - } - } - else - { - removalMap.remove(discoveredNodeName); - } - } - } - - if (!removalMap.isEmpty()) - { - for (Map.Entry replicationNodeEntry : removalMap.entrySet()) - { - String replicationNodeName = replicationNodeEntry.getKey(); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Remote replication node removed '" + replicationNodeName + "' from '" + _groupName + "'"); - } - _remoteReplicationNodes.remove(replicationNodeName); - if (replicationGroupListener != null) - { - replicationGroupListener.onReplicationNodeRemovedFromGroup(replicationNodeEntry.getValue()); - } - } - } - } - } - } - - //TODO: move the class into external class - private class RemoteNodeStateLearner implements Callable - { - private Map _previousGroupState = Collections.emptyMap(); - @Override - public Void call() - { - long remoteNodeMonitorInterval = _remoteReplicationNodeFactory.getRemoteNodeMonitorInterval(); - try - { - Set> futures = new HashSet>(); - for (final RemoteReplicationNode node : _remoteReplicationNodes.values()) - { - Future future = _groupChangeExecutor.submit(new Callable() - { - @Override - public Void call() - { - node.updateNodeState(); - return null; - } - }); - futures.add(future); - } - - for (Future future : futures) - { - try - { - future.get(remoteNodeMonitorInterval, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - Thread.currentThread().interrupt(); - } - catch (ExecutionException e) - { - LOGGER.warn("Cannot update node state for group " + _groupName, e.getCause()); - } - catch (TimeoutException e) - { - LOGGER.warn("Timeout whilst updating node state for group " + _groupName); - future.cancel(true); - } - } - - if (ReplicatedEnvironment.State.MASTER == _environment.getState()) - { - Map currentGroupState = new HashMap(); - for (final RemoteReplicationNode node : _remoteReplicationNodes.values()) - { - currentGroupState.put(node.getName(), (String)node.getAttribute(org.apache.qpid.server.model.ReplicationNode.ROLE)); - } - boolean stateChanged = !_previousGroupState.equals(currentGroupState); - _previousGroupState = currentGroupState; - if (stateChanged && State.OPEN == _state.get()) - { - new DatabasePinger().pingDb(ReplicatedEnvironmentFacade.this); - } - } - } - finally - { - _groupChangeExecutor.schedule(this, remoteNodeMonitorInterval, TimeUnit.MILLISECONDS); - } - return null; - } - } - - private class LoggingAsyncExceptionListener implements ExceptionListener - { - @Override - public void exceptionThrown(ExceptionEvent event) - { - LOGGER.error("Asynchronous exception thrown by BDB thread '" + event.getThreadName() + "'", event.getException()); - } - } - - public static enum State - { - INITIAL, // TODO unused remove - OPENING, - OPEN, - RESTARTING, - CLOSING, - CLOSED - } - - private static class DatabaseHolder - { - private final DatabaseConfig _config; - private Database _database; - - public DatabaseHolder(DatabaseConfig config) - { - _config = config; - } - - public Database getDatabase() - { - return _database; - } - - public void setDatabase(Database database) - { - _database = database; - } - - public DatabaseConfig getConfig() - { - return _config; - } - - @Override - public String toString() - { - return "DatabaseHolder [_config=" + _config + ", _database=" + _database + "]"; - } - - } -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java deleted file mode 100644 index 324b7b647a..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.model.ReplicationNode; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.berkeleydb.replication.LocalReplicationNode; -import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode; -import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory; - -import com.sleepycat.je.Durability; -import com.sleepycat.je.Durability.SyncPolicy; - -public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory -{ - - @Override - public EnvironmentFacade createEnvironmentFacade(String storeLocation, VirtualHost virtualHost) - { - Collection replicationNodes = virtualHost.getChildren(ReplicationNode.class); - if (replicationNodes == null || replicationNodes.size() != 1) - { - throw new IllegalStateException("Expected exactly one replication node but got " + (replicationNodes==null ? 0 :replicationNodes.size()) + " nodes"); - } - ReplicationNode localNode = replicationNodes.iterator().next(); - if (!(localNode instanceof LocalReplicationNode)) - { - throw new IllegalStateException("Cannot find local replication node among virtual host nodes"); - } - String durability = (String)localNode.getAttribute(ReplicationNode.DURABILITY); - Boolean coalescingSync = (Boolean)localNode.getAttribute(ReplicationNode.COALESCING_SYNC); - - if (coalescingSync && Durability.parse(durability).getLocalSync() == SyncPolicy.SYNC) - { - throw new IllegalConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC - + "! Please set highAvailability.coalescingSync to false in store configuration."); - } - - ReplicatedEnvironmentFacade facade = new ReplicatedEnvironmentFacade(storeLocation, localNode, new RemoteReplicationNodeFactoryImpl(virtualHost)); - ((LocalReplicationNode)localNode).setReplicatedEnvironmentFacade(facade); - return facade; - } - - static class RemoteReplicationNodeFactoryImpl implements RemoteReplicationNodeFactory - { - private VirtualHost _virtualHost; - - public RemoteReplicationNodeFactoryImpl(VirtualHost virtualHost) - { - _virtualHost = virtualHost; - } - - @Override - public RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode replicationNode, String groupName) - { - Map attributes = new HashMap(); - attributes.put(ReplicationNode.NAME, replicationNode.getName()); - attributes.put(ReplicationNode.GROUP_NAME, groupName); - attributes.put(ReplicationNode.HOST_PORT, replicationNode.getHostName() + ":" + replicationNode.getPort()); - return new RemoteReplicationNode(replicationNode, groupName, _virtualHost, _virtualHost.getTaskExecutor()); - } - - @Override - public long getRemoteNodeMonitorInterval() - { - return (Long)_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL); - } - } -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java index 4bf228c478..3b6eef832b 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java @@ -32,21 +32,35 @@ import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.ExceptionEvent; -import com.sleepycat.je.ExceptionListener; public class StandardEnvironmentFacade implements EnvironmentFacade { private static final Logger LOGGER = Logger.getLogger(StandardEnvironmentFacade.class); public static final String TYPE = "BDB"; - private Environment _environment; + private final String _storePath; private final Map _databases = new HashMap(); + private Environment _environment; + public StandardEnvironmentFacade(String storePath, Map attributes) { + _storePath = storePath; - LOGGER.info("BDB message store using environment path " + storePath); + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Creating environment at environment path " + _storePath); + } + + File environmentPath = new File(storePath); + if (!environmentPath.exists()) + { + if (!environmentPath.mkdirs()) + { + throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " + + "Ensure the path is correct and that the permissions are correct."); + } + } EnvironmentConfig envConfig = new EnvironmentConfig(); envConfig.setAllowCreate(true); @@ -60,26 +74,9 @@ public class StandardEnvironmentFacade implements EnvironmentFacade envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); - try - { - _environment = new Environment(new File(storePath), envConfig); - } - catch (DatabaseException de) - { - if (de.getMessage().contains("Environment.setAllowCreate is false")) - { - // Allow the creation this time - envConfig.setAllowCreate(true); - _environment = new Environment(new File(storePath), envConfig); - } - else - { - throw de; - } - } + _environment = new Environment(environmentPath, envConfig); } - @Override public void commit(com.sleepycat.je.Transaction tx) throws AMQStoreException { @@ -196,15 +193,6 @@ public class StandardEnvironmentFacade implements EnvironmentFacade return new AMQStoreException(contextMessage, e); } - private class LoggingAsyncExceptionListener implements ExceptionListener - { - @Override - public void exceptionThrown(ExceptionEvent event) - { - LOGGER.error("Asynchronous exception thrown by BDB thread '" + event.getThreadName() + "'", event.getException()); - } - } - @Override public void openDatabases(DatabaseConfig dbConfig, String... databaseNames) { @@ -232,4 +220,10 @@ public class StandardEnvironmentFacade implements EnvironmentFacade return new CoalescingCommiter(name, this); } + @Override + public String getStoreLocation() + { + return _storePath; + } + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java index 7ffee19d78..384ceba98a 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java @@ -20,9 +20,11 @@ */ package org.apache.qpid.server.store.berkeleydb; +import java.io.File; import java.util.HashMap; import java.util.Map; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.VirtualHost; public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory @@ -30,17 +32,45 @@ public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactor @SuppressWarnings("unchecked") @Override - public EnvironmentFacade createEnvironmentFacade(String storePath, VirtualHost virtualHost) + public EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore) { Map envConfigMap = new HashMap(); envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS); - Object bdbEnvConfigAttr = virtualHost.getAttribute("bdbEnvironmentConfig"); - if (bdbEnvConfigAttr instanceof Map) + Object environmentConfigurationAttributes = virtualHost.getAttribute(BDBMessageStore.ENVIRONMENT_CONFIGURATION); + if (environmentConfigurationAttributes instanceof Map) { - envConfigMap.putAll((Map) bdbEnvConfigAttr); + envConfigMap.putAll((Map) environmentConfigurationAttributes); } - return new StandardEnvironmentFacade(storePath, envConfigMap); + + String name = virtualHost.getName(); + final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + name; + + String storeLocation; + if(isMessageStore) + { + storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); + if(storeLocation == null) + { + storeLocation = defaultPath; + } + } + else // we are acting only as the durable config store + { + storeLocation = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH); + if(storeLocation == null) + { + storeLocation = defaultPath; + } + } + + return new StandardEnvironmentFacade(storeLocation, envConfigMap); + } + + @Override + public String getType() + { + return StandardEnvironmentFacade.TYPE; } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java index ca3e858f01..38fdf34196 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java @@ -25,7 +25,6 @@ import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import com.sleepycat.bind.tuple.IntegerBinding; import com.sleepycat.bind.tuple.LongBinding; import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Transaction; @@ -35,7 +34,6 @@ public class DatabasePinger public static final String PING_DATABASE_NAME = "PINGDB"; private static final int ID = 0; - public void pingDb(EnvironmentFacade facade) { try diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java index b4a83e416e..556bcc54c3 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java @@ -37,7 +37,6 @@ import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.adapter.AbstractAdapter; import org.apache.qpid.server.model.adapter.NoStatistics; -import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade; import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.util.ParameterizedTypeImpl; @@ -110,6 +109,11 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication { throw new IllegalConfigurationException("Helper host and port attribute is not specified"); } + Object storePath = attributes.get(STORE_PATH); + if (storePath == null || storePath.equals("")) + { + throw new IllegalConfigurationException("Store path is not specified for the replication node"); + } return attributes; } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java new file mode 100644 index 0000000000..6c4f33364a --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -0,0 +1,1029 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC; +import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY; +import static org.apache.qpid.server.model.ReplicationNode.DURABILITY; +import static org.apache.qpid.server.model.ReplicationNode.GROUP_NAME; +import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT; +import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT; +import static org.apache.qpid.server.model.ReplicationNode.PARAMETERS; +import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS; +import static org.apache.qpid.server.model.ReplicationNode.STORE_PATH; + +import java.io.File; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.replication.ReplicationGroupListener; +import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter; +import org.apache.qpid.server.store.berkeleydb.Committer; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener; +import org.apache.qpid.server.util.DaemonThreadFactory; + +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.Durability; +import com.sleepycat.je.Environment; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.EnvironmentFailureException; +import com.sleepycat.je.OperationFailureException; +import com.sleepycat.je.Transaction; +import com.sleepycat.je.rep.InsufficientLogException; +import com.sleepycat.je.rep.InsufficientReplicasException; +import com.sleepycat.je.rep.NetworkRestore; +import com.sleepycat.je.rep.NetworkRestoreConfig; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationConfig; +import com.sleepycat.je.rep.ReplicationGroup; +import com.sleepycat.je.rep.ReplicationMutableConfig; +import com.sleepycat.je.rep.ReplicationNode; +import com.sleepycat.je.rep.StateChangeEvent; +import com.sleepycat.je.rep.StateChangeListener; +import com.sleepycat.je.rep.util.ReplicationGroupAdmin; +import com.sleepycat.je.utilint.PropUtil; + +public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener +{ + public static final String GROUP_CHECK_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.group_check_interval"; + + private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class); + private static final long DEFAULT_GROUP_CHECK_INTERVAL = 1000l; + private static final long GROUP_CHECK_INTERVAL = Long.getLong(GROUP_CHECK_INTERVAL_PROPERTY_NAME, DEFAULT_GROUP_CHECK_INTERVAL); + + @SuppressWarnings("serial") + private static final Map REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap() + {{ + /** + * Parameter decreased as the 24h default may lead very large log files for most users. + */ + put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h"); + /** + * Parameter increased as the 5 s default may lead to spurious timeouts. + */ + put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s"); + /** + * Parameter increased as the 10 s default may lead to spurious timeouts. + */ + put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s"); + /** + * Parameter increased as the 10 h default may cause user confusion. + */ + put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min"); + /** + * Parameter changed from default true so we adopt immediately adopt the new behaviour early. False + * is scheduled to become default after JE 5.0.48. + */ + put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, Boolean.FALSE.toString()); + /** + * Parameter decreased as a default 5min interval may lead to bigger data losses on Node + * with NO_SYN durability in case if such Node crushes. + */ + put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min"); + + /** + * Timeout to transit into UNKNOWN state if the majority is not available. + * By default it is switched off. + */ + put(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "5 s"); + }}); + + public static final String TYPE = "BDB-HA"; + + // TODO: JMX will change to observe the model, at that point these names will disappear + public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort"; + public static final String GRP_MEM_COL_NODE_NAME = "NodeName"; + + private final String _prettyGroupNodeName; + private final String _groupName; + private final String _nodeName; + private final String _nodeHostPort; + private final String _helperHostPort; + private final Durability _durability; + private final boolean _designatedPrimary; + private final boolean _coalescingSync; + private final File _environmentDirectory; + private final Map _environmentParameters; + private final Map _replicationEnvironmentParameters; + private final ExecutorService _restartEnvironmentExecutor; + private final ScheduledExecutorService _groupChangeExecutor; + private final AtomicReference _state = new AtomicReference(State.OPENING); + private final ConcurrentMap _databases = new ConcurrentHashMap(); + private final ConcurrentMap _remoteReplicationNodes = new ConcurrentHashMap(); + private final RemoteReplicationNodeFactory _remoteReplicationNodeFactory; + + private final AtomicReference _replicationGroupListener = new AtomicReference(); + private final AtomicReference _stateChangeListener = new AtomicReference(); + private volatile ReplicatedEnvironment _environment; + private long _joinTime; + private String _lastKnownReplicationTransactionId; + + @SuppressWarnings("unchecked") + public ReplicatedEnvironmentFacade(org.apache.qpid.server.model.ReplicationNode replicationNode, + RemoteReplicationNodeFactory remoteReplicationNodeFactory) + { + _environmentDirectory = new File((String)replicationNode.getAttribute(STORE_PATH)); + if (!_environmentDirectory.exists()) + { + if (!_environmentDirectory.mkdirs()) + { + throw new IllegalArgumentException("Environment path " + _environmentDirectory + " could not be read or created. " + + "Ensure the path is correct and that the permissions are correct."); + } + } + + _groupName = (String)replicationNode.getAttribute(GROUP_NAME); + _nodeName = replicationNode.getName(); + _nodeHostPort = (String)replicationNode.getAttribute(HOST_PORT);; + _helperHostPort = (String)replicationNode.getAttribute(HELPER_HOST_PORT); + _durability = Durability.parse((String)replicationNode.getAttribute(DURABILITY)); + _designatedPrimary = (Boolean)replicationNode.getAttribute(DESIGNATED_PRIMARY); + _coalescingSync = (Boolean)replicationNode.getAttribute(COALESCING_SYNC); + _environmentParameters = (Map)replicationNode.getAttribute(PARAMETERS); + _replicationEnvironmentParameters = (Map)replicationNode.getAttribute(REPLICATION_PARAMETERS); + _prettyGroupNodeName = _groupName + ":" + _nodeName; + + _restartEnvironmentExecutor = Executors.newFixedThreadPool(1, new DaemonThreadFactory("Environment-Starter:" + _prettyGroupNodeName)); + _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName)); + + _remoteReplicationNodeFactory = remoteReplicationNodeFactory; + _groupChangeExecutor.scheduleWithFixedDelay(new GroupChangeLearner(), 0, GROUP_CHECK_INTERVAL, TimeUnit.MILLISECONDS); + _groupChangeExecutor.schedule(new RemoteNodeStateLearner(), _remoteReplicationNodeFactory.getRemoteNodeMonitorInterval(), TimeUnit.MILLISECONDS); + + // create environment in a separate thread to avoid renaming of the current thread by JE + _environment = createEnvironment(true); + populateExistingRemoteReplicationNodes(); + } + + @Override + public void commit(final Transaction tx) throws AMQStoreException + { + try + { + // Using commit() instead of commitNoSync() for the HA store to allow + // the HA durability configuration to influence resulting behaviour. + tx.commit(); + } + catch (DatabaseException de) + { + throw handleDatabaseException("Got DatabaseException on commit, closing environment", de); + } + } + + @Override + public void close() + { + if (_state.compareAndSet(State.OPENING, State.CLOSING) || + _state.compareAndSet(State.OPEN, State.CLOSING) || + _state.compareAndSet(State.RESTARTING, State.CLOSING) ) + { + try + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName); + } + + _restartEnvironmentExecutor.shutdown(); + _groupChangeExecutor.shutdown(); + closeDatabases(); + closeEnvironment(); + } + finally + { + _state.compareAndSet(State.CLOSING, State.CLOSED); + } + } + } + + @Override + public AMQStoreException handleDatabaseException(String contextMessage, final DatabaseException dbe) + { + boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException); + if (restart) + { + if (_state.compareAndSet(State.OPEN, State.RESTARTING)) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Environment restarting due to exception " + dbe.getMessage(), dbe); + } + _restartEnvironmentExecutor.execute(new Runnable() + { + @Override + public void run() + { + try + { + restartEnvironment(dbe); + } + catch (Exception e) + { + LOGGER.error("Exception on environment restart", e); + } + } + }); + + } + else + { + LOGGER.info("Cannot restart environment because of facade state: " + _state.get()); + } + } + return new AMQStoreException(contextMessage, dbe); + } + + @Override + public void openDatabases(DatabaseConfig dbConfig, String... databaseNames) + { + if (_state.get() != State.OPEN) + { + throw new IllegalStateException("Environment facade is not in opened state"); + } + + if (!_environment.isValid()) + { + throw new IllegalStateException("Environment is not valid"); + } + + if (_environment.getState() != ReplicatedEnvironment.State.MASTER) + { + throw new IllegalStateException("Databases can only be opened on Master node"); + } + + for (String databaseName : databaseNames) + { + _databases.put(databaseName, new DatabaseHolder(dbConfig)); + } + for (String databaseName : databaseNames) + { + DatabaseHolder holder = _databases.get(databaseName); + openDatabaseInternally(databaseName, holder); + } + } + + private void openDatabaseInternally(String databaseName, DatabaseHolder holder) + { + Database database = _environment.openDatabase(null, databaseName, holder.getConfig()); + holder.setDatabase(database); + } + + @Override + public Database getOpenDatabase(String name) + { + if (_state.get() != State.OPEN) + { + throw new IllegalStateException("Environment facade is not in opened state"); + } + + if (!_environment.isValid()) + { + throw new IllegalStateException("Environment is not valid"); + } + DatabaseHolder databaseHolder = _databases.get(name); + if (databaseHolder == null) + { + throw new IllegalArgumentException("Database with name '" + name + "' has never been requested to be opened"); + } + Database database = databaseHolder.getDatabase(); + if (database == null) + { + throw new IllegalArgumentException("Database with name '" + name + "' has not been opened"); + } + return database; + } + + @Override + public String getStoreLocation() + { + return _environmentDirectory.getAbsolutePath(); + } + + @Override + public void stateChange(final StateChangeEvent stateChangeEvent) + { + _groupChangeExecutor.submit(new Runnable() + { + @Override + public void run() + { + stateChanged(stateChangeEvent); + } + }); + } + + private void stateChanged(StateChangeEvent stateChangeEvent) + { + ReplicatedEnvironment.State state = stateChangeEvent.getState(); + LOGGER.info("The node '" + _prettyGroupNodeName + "' state is " + state); + if (state == ReplicatedEnvironment.State.REPLICA || state == ReplicatedEnvironment.State.MASTER) + { + if (_state.compareAndSet(State.OPENING, State.OPEN) || _state.compareAndSet(State.RESTARTING, State.OPEN)) + { + LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName); + _joinTime = System.currentTimeMillis(); + } + } + + if (state == ReplicatedEnvironment.State.MASTER) + { + reopenDatabases(); + StateChangeListener listener = _stateChangeListener.get(); + + if (listener != null) + { + listener.stateChange(stateChangeEvent); + } + } + else + { + if (_state.get() != State.CLOSING && _state.get() != State.CLOSED) + { + StateChangeListener listener = _stateChangeListener.get(); + if (listener != null) + { + listener.stateChange(stateChangeEvent); + } + } + } + } + + private void reopenDatabases() + { + DatabaseConfig pingDbConfig = new DatabaseConfig(); + pingDbConfig.setTransactional(true); + pingDbConfig.setAllowCreate(true); + + _databases.putIfAbsent(DatabasePinger.PING_DATABASE_NAME, new DatabaseHolder(pingDbConfig)); + + for (Map.Entry entry : _databases.entrySet()) + { + openDatabaseInternally(entry.getKey(), entry.getValue()); + } + } + + public String getGroupName() + { + return _groupName; + } + + public String getNodeName() + { + return _nodeName; + } + + public String getHostPort() + { + return _nodeHostPort; + } + + public String getHelperHostPort() + { + return _helperHostPort; + } + + public String getDurability() + { + return _durability.toString(); + } + + public boolean isCoalescingSync() + { + return _coalescingSync; + } + + public String getNodeState() + { + ReplicatedEnvironment.State state = _environment.getState(); + return state.toString(); + } + + public boolean isDesignatedPrimary() + { + return _environment.getRepMutableConfig().getDesignatedPrimary(); + } + + public int getQuorumOverride() + { + return _environment.getRepMutableConfig().getElectableGroupSizeOverride(); + } + + public List> getGroupMembers() + { + List> members = new ArrayList>(); + + for (ReplicationNode node : _environment.getGroup().getNodes()) + { + Map nodeMap = new HashMap(); + nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, node.getName()); + nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort()); + members.add(nodeMap); + } + + return members; + } + + public void removeNodeFromGroup(final String nodeName) throws AMQStoreException + { + try + { + createReplicationGroupAdmin().removeMember(nodeName); + } + catch (OperationFailureException ofe) + { + // TODO: I am not sure about the exception handing here + throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + ofe.getMessage(), ofe); + } + catch (DatabaseException e) + { + // TODO: I am not sure about the exception handing here + throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + e.getMessage(), e); + } + } + + public void setDesignatedPrimary(final boolean isPrimary) throws AMQStoreException + { + try + { + final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig(); + final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary); + _environment.setRepMutableConfig(newConfig); + + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Node " + _prettyGroupNodeName + " successfully set as designated primary for group"); + } + + } + catch (DatabaseException e) + { + // TODO: I am not sure about the exception handing here + throw handleDatabaseException("Cannot set designated primary", e); + } + } + + public void updateAddress(final String nodeName, final String newHostName, final int newPort) throws AMQStoreException + { + try + { + createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort); + } + catch (OperationFailureException ofe) + { + // TODO: I am not sure about the exception handing here + throw new AMQStoreException("Failed to update address for '" + nodeName + "' with new host " + newHostName + + " and new port " + newPort + ". " + ofe.getMessage(), ofe); + } + catch (DatabaseException e) + { + // TODO: I am not sure about the exception handing here + throw handleDatabaseException("Failed to update address for '" + nodeName + "' with new host " + newHostName + + " and new port " + newPort + ". " + e.getMessage(), e); + } + } + + public int getPriority() + { + ReplicationMutableConfig repConfig = _environment.getRepMutableConfig(); + return repConfig.getNodePriority(); + } + + public int getElectableGroupSizeOverride() + { + ReplicationMutableConfig repConfig = _environment.getRepMutableConfig(); + return repConfig.getElectableGroupSizeOverride(); + } + + public long getJoinTime() + { + return _joinTime ; + } + + public String getLastKnownReplicationTransactionId() + { + return _lastKnownReplicationTransactionId; + } + + public ReplicatedEnvironment getEnvironment() + { + return _environment; + } + + public State getFacadeState() + { + return _state.get(); + } + + public void setReplicationGroupListener(ReplicationGroupListener replicationGroupListener) + { + if (_replicationGroupListener.compareAndSet(null, replicationGroupListener)) + { + notifyExistingRemoteReplicationNodes(replicationGroupListener); + } + else + { + throw new IllegalStateException("ReplicationGroupListener is already set on " + _prettyGroupNodeName); + } + } + + public void setStateChangeListener(StateChangeListener stateChangeListener) + { + if (_stateChangeListener.compareAndSet(null, stateChangeListener)) + { + _environment.setStateChangeListener(this); + } + else + { + throw new IllegalStateException("StateChangeListener is already set on " + _prettyGroupNodeName); + } + } + + private void populateExistingRemoteReplicationNodes() + { + ReplicationGroup group = _environment.getGroup(); + Set nodes = new HashSet(group.getElectableNodes()); + String localNodeName = getNodeName(); + + for (ReplicationNode replicationNode : nodes) + { + String discoveredNodeName = replicationNode.getName(); + if (!discoveredNodeName.equals(localNodeName)) + { + RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, group.getName()); + + _remoteReplicationNodes.put(replicationNode.getName(), remoteNode); + } + } + } + + private void notifyExistingRemoteReplicationNodes(ReplicationGroupListener listener) + { + for (org.apache.qpid.server.model.ReplicationNode value : _remoteReplicationNodes.values()) + { + listener.onReplicationNodeRecovered(value); + } + } + + private ReplicationGroupAdmin createReplicationGroupAdmin() + { + final Set helpers = new HashSet(); + helpers.addAll(_environment.getRepConfig().getHelperSockets()); + + final ReplicationConfig repConfig = _environment.getRepConfig(); + helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort())); + + return new ReplicationGroupAdmin(_groupName, helpers); + } + + private void closeEnvironment() + { + // Clean the log before closing. This makes sure it doesn't contain + // redundant data. Closing without doing this means the cleaner may not + // get a chance to finish. + try + { + if (_environment.isValid()) + { + _environment.cleanLog(); + } + } + finally + { + _environment.close(); + _environment = null; + } + } + + private void restartEnvironment(DatabaseException dbe) throws AMQStoreException + { + LOGGER.info("Restarting environment"); + + closeEnvironmentSafely(); + + _environment = createEnvironment(false); + + if (_stateChangeListener.get() != null) + { + _environment.setStateChangeListener(this); + } + + LOGGER.info("Environment is restarted"); + } + + private void closeEnvironmentSafely() + { + Environment environment = _environment; + if (environment != null) + { + try + { + if (environment.isValid()) + { + try + { + closeDatabases(); + } + catch(Exception e) + { + LOGGER.warn("Ignoring an exception whilst closing databases", e); + } + } + environment.close(); + } + catch (EnvironmentFailureException efe) + { + LOGGER.warn("Ignoring an exception whilst closing environment", efe); + } + } + } + + private void closeDatabases() + { + RuntimeException firstThrownException = null; + for (Map.Entry entry : _databases.entrySet()) + { + DatabaseHolder databaseHolder = entry.getValue(); + Database database = databaseHolder.getDatabase(); + if (database != null) + { + try + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Closing database " + entry.getKey() + " on " + _prettyGroupNodeName); + } + + database.close(); + } + catch(RuntimeException e) + { + LOGGER.error("Failed to close database on " + _prettyGroupNodeName, e); + if (firstThrownException == null) + { + firstThrownException = e; + } + } + finally + { + databaseHolder.setDatabase(null); + } + } + } + if (firstThrownException != null) + { + throw firstThrownException; + } + } + + private ReplicatedEnvironment createEnvironment(boolean createEnvironmentInSeparateThread) + { + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Creating environment"); + LOGGER.info("Environment path " + _environmentDirectory.getAbsolutePath()); + LOGGER.info("Group name " + _groupName); + LOGGER.info("Node name " + _nodeName); + LOGGER.info("Node host port " + _nodeHostPort); + LOGGER.info("Helper host port " + _helperHostPort); + LOGGER.info("Durability " + _durability); + LOGGER.info("Coalescing sync " + _coalescingSync); + LOGGER.info("Designated primary (applicable to 2 node case only) " + _designatedPrimary); + } + + Map replicationEnvironmentSettings = new HashMap(REPCONFIG_DEFAULTS); + if (_replicationEnvironmentParameters != null && !_replicationEnvironmentParameters.isEmpty()) + { + replicationEnvironmentSettings.putAll(_replicationEnvironmentParameters); + } + Map environmentSettings = new HashMap(EnvironmentFacade.ENVCONFIG_DEFAULTS); + if (_environmentParameters != null && !_environmentParameters.isEmpty()) + { + environmentSettings.putAll(_environmentParameters); + } + + ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort); + replicationConfig.setHelperHosts(_helperHostPort); + replicationConfig.setDesignatedPrimary(_designatedPrimary); + + for (Map.Entry configItem : replicationEnvironmentSettings.entrySet()) + { + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); + } + replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue()); + } + + EnvironmentConfig envConfig = new EnvironmentConfig(); + envConfig.setAllowCreate(true); + envConfig.setTransactional(true); + envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); + envConfig.setDurability(_durability); + + for (Map.Entry configItem : environmentSettings.entrySet()) + { + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); + } + envConfig.setConfigParam(configItem.getKey(), configItem.getValue()); + } + + if (createEnvironmentInSeparateThread) + { + return createEnvironmentInSeparateThread(_environmentDirectory, envConfig, replicationConfig); + } + else + { + return createEnvironment(_environmentDirectory, envConfig, replicationConfig); + } + } + + private ReplicatedEnvironment createEnvironmentInSeparateThread(final File environmentPathFile, final EnvironmentConfig envConfig, + final ReplicationConfig replicationConfig) + { + Future environmentFuture = _restartEnvironmentExecutor.submit(new Callable(){ + @Override + public ReplicatedEnvironment call() throws Exception + { + String originalThreadName = Thread.currentThread().getName(); + try + { + return createEnvironment(environmentPathFile, envConfig, replicationConfig); + } + finally + { + Thread.currentThread().setName(originalThreadName); + } + }}); + + long setUpTimeOutMillis = PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT)); + try + { + return environmentFuture.get(setUpTimeOutMillis, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new RuntimeException("Environment creation was interrupted", e); + } + catch (ExecutionException e) + { + throw new RuntimeException("Unexpected exception on environment creation", e.getCause()); + } + catch (TimeoutException e) + { + throw new RuntimeException("JE environment has not been created in due time"); + } + } + + private ReplicatedEnvironment createEnvironment(File environmentPathFile, EnvironmentConfig envConfig, + final ReplicationConfig replicationConfig) + { + ReplicatedEnvironment environment = null; + try + { + environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); + } + catch (final InsufficientLogException ile) + { + LOGGER.info("InsufficientLogException thrown and so full network restore required", ile); + NetworkRestore restore = new NetworkRestore(); + NetworkRestoreConfig config = new NetworkRestoreConfig(); + config.setRetainLogFiles(false); + restore.execute(ile, config); + environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); + } + return environment; + } + + @Override + public Committer createCommitter(String name) + { + if (_coalescingSync) + { + return new CoalescingCommiter(name, this); + } + else + { + return Committer.IMMEDIATE_FUTURE_COMMITTER; + } + } + + private final class GroupChangeLearner implements Runnable + { + @Override + public void run() + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Checking for changes in the group " + _groupName); + } + + ReplicatedEnvironment env = _environment; + ReplicationGroupListener replicationGroupListener = _replicationGroupListener.get(); + if (env != null && env.isValid()) + { + ReplicationGroup group = env.getGroup(); + Set nodes = new HashSet(group.getElectableNodes()); + String localNodeName = getNodeName(); + + Map removalMap = new HashMap(_remoteReplicationNodes); + for (ReplicationNode replicationNode : nodes) + { + String discoveredNodeName = replicationNode.getName(); + if (!discoveredNodeName.equals(localNodeName)) + { + if (!_remoteReplicationNodes.containsKey(discoveredNodeName)) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Remote replication node added '" + replicationNode + "' to '" + _groupName + "'"); + } + + RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, group.getName()); + + _remoteReplicationNodes.put(discoveredNodeName, remoteNode); + if (replicationGroupListener != null) + { + replicationGroupListener.onReplicationNodeAddedToGroup(remoteNode); + } + } + else + { + removalMap.remove(discoveredNodeName); + } + } + } + + if (!removalMap.isEmpty()) + { + for (Map.Entry replicationNodeEntry : removalMap.entrySet()) + { + String replicationNodeName = replicationNodeEntry.getKey(); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Remote replication node removed '" + replicationNodeName + "' from '" + _groupName + "'"); + } + _remoteReplicationNodes.remove(replicationNodeName); + if (replicationGroupListener != null) + { + replicationGroupListener.onReplicationNodeRemovedFromGroup(replicationNodeEntry.getValue()); + } + } + } + } + } + } + + private class RemoteNodeStateLearner implements Callable + { + private Map _previousGroupState = Collections.emptyMap(); + @Override + public Void call() + { + long remoteNodeMonitorInterval = _remoteReplicationNodeFactory.getRemoteNodeMonitorInterval(); + try + { + Set> futures = new HashSet>(); + for (final RemoteReplicationNode node : _remoteReplicationNodes.values()) + { + Future future = _groupChangeExecutor.submit(new Callable() + { + @Override + public Void call() + { + node.updateNodeState(); + return null; + } + }); + futures.add(future); + } + + for (Future future : futures) + { + try + { + future.get(remoteNodeMonitorInterval, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + catch (ExecutionException e) + { + LOGGER.warn("Cannot update node state for group " + _groupName, e.getCause()); + } + catch (TimeoutException e) + { + LOGGER.warn("Timeout whilst updating node state for group " + _groupName); + future.cancel(true); + } + } + + if (ReplicatedEnvironment.State.MASTER == _environment.getState()) + { + Map currentGroupState = new HashMap(); + for (final RemoteReplicationNode node : _remoteReplicationNodes.values()) + { + currentGroupState.put(node.getName(), (String)node.getAttribute(org.apache.qpid.server.model.ReplicationNode.ROLE)); + } + boolean stateChanged = !_previousGroupState.equals(currentGroupState); + _previousGroupState = currentGroupState; + if (stateChanged && State.OPEN == _state.get()) + { + new DatabasePinger().pingDb(ReplicatedEnvironmentFacade.this); + } + } + } + finally + { + _groupChangeExecutor.schedule(this, remoteNodeMonitorInterval, TimeUnit.MILLISECONDS); + } + return null; + } + } + + public static enum State + { + OPENING, + OPEN, + RESTARTING, + CLOSING, + CLOSED + } + + private static class DatabaseHolder + { + private final DatabaseConfig _config; + private Database _database; + + public DatabaseHolder(DatabaseConfig config) + { + _config = config; + } + + public Database getDatabase() + { + return _database; + } + + public void setDatabase(Database database) + { + _database = database; + } + + public DatabaseConfig getConfig() + { + return _config; + } + + @Override + public String toString() + { + return "DatabaseHolder [_config=" + _config + ", _database=" + _database + "]"; + } + + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java new file mode 100644 index 0000000000..0ddd7134ac --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -0,0 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.model.ReplicationNode; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory; + +import com.sleepycat.je.Durability; +import com.sleepycat.je.Durability.SyncPolicy; + +public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory +{ + + @Override + public EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore) + { + Collection replicationNodes = virtualHost.getChildren(ReplicationNode.class); + if (replicationNodes == null || replicationNodes.size() != 1) + { + throw new IllegalStateException("Expected exactly one replication node but got " + (replicationNodes==null ? 0 :replicationNodes.size()) + " nodes"); + } + ReplicationNode localNode = replicationNodes.iterator().next(); + if (!(localNode instanceof LocalReplicationNode)) + { + throw new IllegalStateException("Cannot find local replication node among virtual host nodes"); + } + String durability = (String)localNode.getAttribute(ReplicationNode.DURABILITY); + Boolean coalescingSync = (Boolean)localNode.getAttribute(ReplicationNode.COALESCING_SYNC); + + if (coalescingSync && Durability.parse(durability).getLocalSync() == SyncPolicy.SYNC) + { + throw new IllegalConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC + + "! Please set highAvailability.coalescingSync to false in store configuration."); + } + + ReplicatedEnvironmentFacade facade = new ReplicatedEnvironmentFacade(localNode, new RemoteReplicationNodeFactoryImpl(virtualHost)); + ((LocalReplicationNode)localNode).setReplicatedEnvironmentFacade(facade); + return facade; + } + + static class RemoteReplicationNodeFactoryImpl implements RemoteReplicationNodeFactory + { + private VirtualHost _virtualHost; + + public RemoteReplicationNodeFactoryImpl(VirtualHost virtualHost) + { + _virtualHost = virtualHost; + } + + @Override + public RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode replicationNode, String groupName) + { + Map attributes = new HashMap(); + attributes.put(ReplicationNode.NAME, replicationNode.getName()); + attributes.put(ReplicationNode.GROUP_NAME, groupName); + attributes.put(ReplicationNode.HOST_PORT, replicationNode.getHostName() + ":" + replicationNode.getPort()); + return new RemoteReplicationNode(replicationNode, groupName, _virtualHost, _virtualHost.getTaskExecutor()); + } + + @Override + public long getRemoteNodeMonitorInterval() + { + return (Long)_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL); + } + } + + @Override + public String getType() + { + return ReplicatedEnvironmentFacade.TYPE; + } +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java index 4684358190..7a645a6932 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java @@ -65,7 +65,7 @@ public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestB _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); Map envMap = Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE); - when(virtualHost.getAttribute(eq("bdbEnvironmentConfig"))).thenReturn(envMap); + when(virtualHost.getAttribute(eq(BDBMessageStore.ENVIRONMENT_CONFIGURATION))).thenReturn(envMap); when(virtualHost.getAttribute(eq(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE))).thenReturn(OVERFULL_SIZE); when(virtualHost.getAttribute(eq(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE))).thenReturn(UNDERFULL_SIZE); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java index 730001d849..48fb180984 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java @@ -22,20 +22,15 @@ package org.apache.qpid.server.store.berkeleydb; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreCreator; -import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; import org.apache.qpid.test.utils.QpidTestCase; public class MessageStoreCreatorTest extends QpidTestCase { - private static final String[] STORE_TYPES = {BDBMessageStore.TYPE}; - public void testMessageStoreCreator() { MessageStoreCreator messageStoreCreator = new MessageStoreCreator(); - for (String type : STORE_TYPES) - { - MessageStore store = messageStoreCreator.createMessageStore(type); - assertNotNull("Store of type " + type + " is not created", store); - } + String type = new BDBMessageStoreFactory().getType(); + MessageStore store = messageStoreCreator.createMessageStore(type); + assertNotNull("Store of type " + type + " is not created", store); } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopReplicationGroupListener.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopReplicationGroupListener.java deleted file mode 100644 index f4bb79596a..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopReplicationGroupListener.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import org.apache.qpid.server.model.ReplicationNode; -import org.apache.qpid.server.replication.ReplicationGroupListener; - -class NoopReplicationGroupListener implements ReplicationGroupListener -{ - @Override - public void onReplicationNodeRecovered(ReplicationNode node) - { - } - - @Override - public void onReplicationNodeAddedToGroup(ReplicationNode node) - { - } - - @Override - public void onReplicationNodeRemovedFromGroup(ReplicationNode node) - { - } -} \ No newline at end of file diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java deleted file mode 100644 index 806c108acd..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java +++ /dev/null @@ -1,599 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC; -import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY; -import static org.apache.qpid.server.model.ReplicationNode.DURABILITY; -import static org.apache.qpid.server.model.ReplicationNode.GROUP_NAME; -import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT; -import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT; -import static org.apache.qpid.server.model.ReplicationNode.NAME; -import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.File; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.server.model.ReplicationNode; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.replication.ReplicationGroupListener; -import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode; -import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.test.utils.TestFileUtils; -import org.apache.qpid.util.FileUtils; - -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.Durability; -import com.sleepycat.je.Environment; -import com.sleepycat.je.rep.InsufficientReplicasException; -import com.sleepycat.je.rep.ReplicatedEnvironment.State; -import com.sleepycat.je.rep.ReplicationConfig; -import com.sleepycat.je.rep.StateChangeEvent; -import com.sleepycat.je.rep.StateChangeListener; - -public class ReplicatedEnvironmentFacadeTest extends QpidTestCase -{ - protected File _storePath; - private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort(); - private static final int LISTENER_TIMEOUT = 5; - private static final int WAIT_STATE_CHANGE_TIMEOUT = 30; - private static final String TEST_GROUP_NAME = "testGroupName"; - private static final String TEST_NODE_NAME = "testNodeName"; - private static final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT; - private static final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT; - private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString(); - private static final boolean TEST_DESIGNATED_PRIMARY = false; - private static final boolean TEST_COALESCING_SYNC = true; - private final Map _nodes = new HashMap(); - private VirtualHost _virtualHost = mock(VirtualHost.class); - private RemoteReplicationNodeFactory _remoteReplicationNodeFactory = new ReplicatedEnvironmentFacadeFactory.RemoteReplicationNodeFactoryImpl(_virtualHost); - - public void setUp() throws Exception - { - super.setUp(); - - _storePath = TestFileUtils.createTestDirectory("bdb", true); - - when(_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL)).thenReturn(100L); - when(_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT)).thenReturn(100L); - } - - @Override - public void tearDown() throws Exception - { - try - { - for (EnvironmentFacade ef : _nodes.values()) - { - ef.close(); - } - } - finally - { - try - { - if (_storePath != null) - { - FileUtils.delete(_storePath, true); - } - } - finally - { - super.tearDown(); - } - } - } - public void testEnvironmentFacade() throws Exception - { - EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster(); - assertNotNull("Environment should not be null", ef); - Environment e = ef.getEnvironment(); - assertTrue("Environment is not valid", e.isValid()); - } - - public void testClose() throws Exception - { - EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster(); - ef.close(); - Environment e = ef.getEnvironment(); - - assertNull("Environment should be null after facade close", e); - } - - public void testOpenDatabases() throws Exception - { - EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster(); - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - ef.openDatabases(dbConfig, "test1", "test2"); - Database test1 = ef.getOpenDatabase("test1"); - Database test2 = ef.getOpenDatabase("test2"); - - assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); - assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName()); - } - - public void testGetOpenDatabaseForNonExistingDatabase() throws Exception - { - EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster(); - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - ef.openDatabases(dbConfig, "test1"); - Database test1 = ef.getOpenDatabase("test1"); - assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); - try - { - ef.getOpenDatabase("test2"); - fail("An exception should be thrown for the non existing database"); - } - catch(IllegalArgumentException e) - { - assertEquals("Unexpected exception message", "Database with name 'test2' has never been requested to be opened", e.getMessage()); - } - } - - public void testGetGroupName() throws Exception - { - assertEquals("Unexpected group name", TEST_GROUP_NAME, ((ReplicatedEnvironmentFacade) createMaster()).getGroupName()); - } - - public void testGetNodeName() throws Exception - { - assertEquals("Unexpected group name", TEST_NODE_NAME, ((ReplicatedEnvironmentFacade) createMaster()).getNodeName()); - } - - public void testGetNodeHostPort() throws Exception - { - assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, ((ReplicatedEnvironmentFacade) createMaster()).getHostPort()); - } - - public void testGetHelperHostPort() throws Exception - { - assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, ((ReplicatedEnvironmentFacade) createMaster()).getHelperHostPort()); - } - - public void testGetDurability() throws Exception - { - assertEquals("Unexpected durability", TEST_DURABILITY.toString(), ((ReplicatedEnvironmentFacade) createMaster()).getDurability()); - } - - public void testIsCoalescingSync() throws Exception - { - assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, ((ReplicatedEnvironmentFacade) createMaster()).isCoalescingSync()); - } - - public void testGetNodeState() throws Exception - { - assertEquals("Unexpected state", State.MASTER.name(), ((ReplicatedEnvironmentFacade) createMaster()).getNodeState()); - } - - public void testIsDesignatedPrimary() throws Exception - { - ReplicatedEnvironmentFacade master = (ReplicatedEnvironmentFacade) createMaster(); - assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary()); - master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY); - assertEquals("Unexpected designated primary after change", !TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary()); - } - - public void testGetGroupMembers() throws Exception - { - List> groupMembers = ((ReplicatedEnvironmentFacade) createMaster()).getGroupMembers(); - Map expectedMember = new HashMap(); - expectedMember.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME); - expectedMember.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT); - Set> expectedGroupMembers = Collections.singleton(expectedMember); - assertEquals("Unexpected group members", expectedGroupMembers, new HashSet>(groupMembers)); - } - - public void testReplicationGroupListenerHearsAboutExistingRemoteReplicationNodes() throws Exception - { - ReplicatedEnvironmentFacade master = (ReplicatedEnvironmentFacade) createMaster(); - String nodeName2 = TEST_NODE_NAME + "_2"; - String host = "localhost"; - int port = getNextAvailable(TEST_NODE_PORT + 1); - String node2NodeHostPort = host + ":" + port; - - final AtomicInteger invocationCount = new AtomicInteger(); - final CountDownLatch nodeRecoveryLatch = new CountDownLatch(1); - ReplicationGroupListener listener = new NoopReplicationGroupListener() - { - @Override - public void onReplicationNodeRecovered(ReplicationNode node) - { - nodeRecoveryLatch.countDown(); - invocationCount.incrementAndGet(); - } - }; - - addReplica(nodeName2, node2NodeHostPort, listener); - - List> groupMembers = master.getGroupMembers(); - assertEquals("Unexpected number of nodes", 2, groupMembers.size()); - - assertTrue("Listener not fired within timeout", nodeRecoveryLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - assertEquals("Unexpected number of listener invocations", 1, invocationCount.get()); - } - - public void testReplicationGroupListenerHearsNodeAdded() throws Exception - { - final CountDownLatch nodeAddedLatch = new CountDownLatch(1); - final AtomicInteger invocationCount = new AtomicInteger(); - ReplicationGroupListener listener = new NoopReplicationGroupListener() - { - @Override - public void onReplicationNodeAddedToGroup(ReplicationNode node) - { - invocationCount.getAndIncrement(); - nodeAddedLatch.countDown(); - } - }; - - TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); - ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener); - assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - - List> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers(); - assertEquals("Unexpected number of nodes at start of test", 1, initialGroupMembers.size()); - - String node2Name = TEST_NODE_NAME + "_2"; - String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1); - addReplica(node2Name, node2NodeHostPort); - - assertTrue("Listener not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - - List> groupMembers = replicatedEnvironmentFacade.getGroupMembers(); - assertEquals("Unexpected number of nodes", 2, groupMembers.size()); - - assertEquals("Unexpected number of listener invocations", 1, invocationCount.get()); - } - - public void testReplicationGroupListenerHearsNodeRemoved() throws Exception - { - final CountDownLatch nodeDeletedLatch = new CountDownLatch(1); - final CountDownLatch nodeAddedLatch = new CountDownLatch(1); - final AtomicInteger invocationCount = new AtomicInteger(); - ReplicationGroupListener listener = new NoopReplicationGroupListener() - { - @Override - public void onReplicationNodeRecovered(ReplicationNode node) - { - nodeAddedLatch.countDown(); - } - - @Override - public void onReplicationNodeAddedToGroup(ReplicationNode node) - { - nodeAddedLatch.countDown(); - } - - @Override - public void onReplicationNodeRemovedFromGroup(ReplicationNode node) - { - invocationCount.getAndIncrement(); - nodeDeletedLatch.countDown(); - } - }; - - TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); - ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener); - assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - - String node2Name = TEST_NODE_NAME + "_2"; - String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1); - addReplica(node2Name, node2NodeHostPort); - - List> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers(); - assertEquals("Unexpected number of nodes at start of test", 2, initialGroupMembers.size()); - - // Need to await the listener hearing the addition of the node to the model. - assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - - // Now remove the node and ensure we hear the event - replicatedEnvironmentFacade.removeNodeFromGroup(node2Name); - - assertTrue("Node delete not fired within timeout", nodeDeletedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - - List> groupMembers = replicatedEnvironmentFacade.getGroupMembers(); - assertEquals("Unexpected number of nodes after node removal", 1, groupMembers.size()); - - assertEquals("Unexpected number of listener invocations", 1, invocationCount.get()); - } - - public void testMasterHearsRemoteNodeRoles() throws Exception - { - - final CountDownLatch nodeAddedLatch = new CountDownLatch(1); - final AtomicReference nodeRef = new AtomicReference(); - ReplicationGroupListener listener = new NoopReplicationGroupListener() - { - @Override - public void onReplicationNodeAddedToGroup(ReplicationNode node) - { - nodeRef.set(node); - nodeAddedLatch.countDown(); - } - }; - - TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); - ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener); - assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - - String node2Name = TEST_NODE_NAME + "_2"; - String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1); - addReplica(node2Name, node2NodeHostPort); - - List> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers(); - assertEquals("Unexpected number of nodes at start of test", 2, initialGroupMembers.size()); - - assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - - RemoteReplicationNode remoteNode = (RemoteReplicationNode)nodeRef.get(); - assertEquals("Unexpcted node name", node2Name, remoteNode.getName()); - - // Need to poll to await the remote node updating itself - long timeout = System.currentTimeMillis() + 5000; - while(!State.REPLICA.name().equals(remoteNode.getAttribute(ReplicationNode.ROLE)) && System.currentTimeMillis() < timeout) - { - Thread.sleep(200); - } - - assertEquals("Unexpcted node role (after waiting)", State.REPLICA.name(), remoteNode.getAttribute(ReplicationNode.ROLE)); - assertNotNull("Replica node " + ReplicationNode.JOIN_TIME + " attribute is not set", remoteNode.getAttribute(ReplicationNode.JOIN_TIME)); - assertNotNull("Replica node " + ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID + " attribute is not set", remoteNode.getAttribute(ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)); - } - - public void testRemoveNodeFromGroup() throws Exception - { - ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) createMaster(); - - String node2Name = TEST_NODE_NAME + "_2"; - String node2NodeHostPort = "localhost:" + getNextAvailable(TEST_NODE_PORT + 1); - ReplicatedEnvironmentFacade ref2 = addReplica(node2Name, node2NodeHostPort); - - List> groupMembers = environmentFacade.getGroupMembers(); - assertEquals("Unexpected group members count", 2, groupMembers.size()); - ref2.close(); - - environmentFacade.removeNodeFromGroup(node2Name); - groupMembers = environmentFacade.getGroupMembers(); - assertEquals("Unexpected group members count", 1, groupMembers.size()); - } - - public void testSetDesignatedPrimary() throws Exception - { - ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) createMaster(); - environmentFacade.setDesignatedPrimary(false); - assertFalse("Unexpected designated primary", environmentFacade.isDesignatedPrimary()); - } - - public void testGetNodePriority() throws Exception - { - assertEquals("Unexpected node priority", 1, ((ReplicatedEnvironmentFacade) createMaster()).getPriority()); - } - - public void testGetElectableGroupSizeOverride() throws Exception - { - assertEquals("Unexpected Electable Group Size Override", 0, ((ReplicatedEnvironmentFacade) createMaster()).getElectableGroupSizeOverride()); - } - - public void testEnvironmentRestartOnInsufficientReplicas() throws Exception - { - ReplicatedEnvironmentFacade master = (ReplicatedEnvironmentFacade) createMaster(); - - int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); - String replica1NodeName = TEST_NODE_NAME + "_1"; - String replica1NodeHostPort = "localhost:" + replica1Port; - ReplicatedEnvironmentFacade replica1 = addReplica(replica1NodeName, replica1NodeHostPort); - - int replica2Port = getNextAvailable(replica1Port + 1); - String replica2NodeName = TEST_NODE_NAME + "_2"; - String replica2NodeHostPort = "localhost:" + replica2Port; - ReplicatedEnvironmentFacade replica2 = addReplica(replica2NodeName, replica2NodeHostPort); - - String databaseName = "test"; - - DatabaseConfig dbConfig = createDatabase(master, databaseName); - - // close replicas - replica1.close(); - replica2.close(); - - Environment e = master.getEnvironment(); - Database db = master.getOpenDatabase(databaseName); - try - { - master.openDatabases(dbConfig, "test2"); - fail("Opening of new database without quorum should fail"); - } - catch(InsufficientReplicasException ex) - { - master.handleDatabaseException(null, ex); - } - - replica1 = addReplica(replica1NodeName, replica1NodeHostPort); - replica2 = addReplica(replica2NodeName, replica2NodeHostPort); - - // Need to poll to await the remote node updating itself - long timeout = System.currentTimeMillis() + 5000; - while(!(State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ) && System.currentTimeMillis() < timeout) - { - Thread.sleep(200); - } - - assertTrue("The node could not rejoin the cluster. State is " + master.getNodeState(), - State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ); - - Environment e2 = master.getEnvironment(); - assertNotSame("Environment has not been restarted", e2, e); - - Database db1 = master.getOpenDatabase(databaseName); - assertNotSame("Database should be the re-created", db1, db); - } - - public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception - { - final CountDownLatch masterLatch = new CountDownLatch(1); - final AtomicInteger masterStateChangeCount = new AtomicInteger(); - final CountDownLatch unknownLatch = new CountDownLatch(1); - final AtomicInteger unknownStateChangeCount = new AtomicInteger(); - StateChangeListener stateChangeListener = new StateChangeListener() - { - @Override - public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException - { - if (stateChangeEvent.getState() == State.MASTER) - { - masterStateChangeCount.incrementAndGet(); - masterLatch.countDown(); - } - else if (stateChangeEvent.getState() == State.UNKNOWN) - { - unknownStateChangeCount.incrementAndGet(); - unknownLatch.countDown(); - } - } - }; - - addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener()); - assertTrue("Master was not started", masterLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - - int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); - String node1NodeHostPort = "localhost:" + replica1Port; - int replica2Port = getNextAvailable(replica1Port + 1); - String node2NodeHostPort = "localhost:" + replica2Port; - - ReplicatedEnvironmentFacade replica1 = addReplica(TEST_NODE_NAME + "_1", node1NodeHostPort); - ReplicatedEnvironmentFacade replica2 = addReplica(TEST_NODE_NAME + "_2", node2NodeHostPort); - - // close replicas - replica1.close(); - replica2.close(); - - assertTrue("Environment should be recreated and go into unknown state", - unknownLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS)); - - assertEquals("Node made master an unexpected number of times", 1, masterStateChangeCount.get()); - assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get()); - } - - public void testCloseStateTransitions() throws Exception - { - ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster(); - - assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState()); - replicatedEnvironmentFacade.close(); - assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState()); - } - - private ReplicatedEnvironmentFacade createMaster() throws Exception - { - TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); - ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener()); - assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - return env; - } - - private ReplicatedEnvironmentFacade addReplica(String nodeName, String nodeHostPort) throws Exception - { - return addReplica(nodeName, nodeHostPort, new NoopReplicationGroupListener()); - } - - private ReplicatedEnvironmentFacade addReplica(String nodeName, String nodeHostPort, ReplicationGroupListener replicationGroupListener) - throws Exception - { - TestStateChangeListener testStateChangeListener = new TestStateChangeListener(State.REPLICA); - ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, replicationGroupListener); - assertTrue("Replica " + nodeName + " was not started", testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); - return replicaEnvironmentFacade; - } - - private String createNodeWorkingFolder(String nodeName) - { - File nodeLocation = new File(_storePath, nodeName); - if (!nodeLocation.exists()) - { - nodeLocation.mkdirs(); - } - final String nodePath = nodeLocation.getAbsolutePath(); - return nodePath; - } - - private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary, - State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener) - { - final String nodePath = createNodeWorkingFolder(nodeName); - ReplicationNode node = createReplicationNodeMock(nodeName, nodeHostPort, designatedPrimary); - ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(nodePath, node, _remoteReplicationNodeFactory); - ref.setReplicationGroupListener(replicationGroupListener); - ref.setStateChangeListener(stateChangeListener); - _nodes.put(nodeName, ref); - return ref; - } - - private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener groupChangeListener) - { - return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener, groupChangeListener); - } - - private DatabaseConfig createDatabase(ReplicatedEnvironmentFacade environmentFacade, String databaseName) throws AMQStoreException - { - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - environmentFacade.openDatabases(dbConfig, databaseName); - return dbConfig; - } - - private ReplicationNode createReplicationNodeMock(String nodeName, String nodeHostPort, boolean designatedPrimary) - { - ReplicationNode node = mock(ReplicationNode.class); - when(node.getAttribute(NAME)).thenReturn(nodeName); - when(node.getName()).thenReturn(nodeName); - when(node.getAttribute(HOST_PORT)).thenReturn(nodeHostPort); - when(node.getAttribute(DESIGNATED_PRIMARY)).thenReturn(designatedPrimary); - when(node.getAttribute(GROUP_NAME)).thenReturn(TEST_GROUP_NAME); - when(node.getAttribute(HELPER_HOST_PORT)).thenReturn(TEST_NODE_HELPER_HOST_PORT); - when(node.getAttribute(DURABILITY)).thenReturn(TEST_DURABILITY); - when(node.getAttribute(COALESCING_SYNC)).thenReturn(TEST_COALESCING_SYNC); - - Map repConfig = new HashMap(); - repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s"); - repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s"); - when(node.getAttribute(REPLICATION_PARAMETERS)).thenReturn(repConfig); - return node; - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java index 5bad51fbc3..b19e18b204 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java @@ -24,7 +24,6 @@ import java.io.File; import java.util.Collections; import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.test.utils.TestFileUtils; import org.apache.qpid.util.FileUtils; import com.sleepycat.je.Database; @@ -39,7 +38,7 @@ public class StandardEnvironmentFacadeTest extends QpidTestCase protected void setUp() throws Exception { super.setUp(); - _storePath = TestFileUtils.createTestDirectory("bdb", true); + _storePath = new File(TMP_FOLDER + File.separator + "bdb" + File.separator + getTestName()); } protected void tearDown() throws Exception diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/TestStateChangeListener.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/TestStateChangeListener.java deleted file mode 100644 index 5b5a507c1f..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/TestStateChangeListener.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import com.sleepycat.je.rep.StateChangeEvent; -import com.sleepycat.je.rep.StateChangeListener; -import com.sleepycat.je.rep.ReplicatedEnvironment.State; - -class TestStateChangeListener implements StateChangeListener -{ - private final State _expectedState; - private final CountDownLatch _latch; - - public TestStateChangeListener(State expectedState) - { - _expectedState = expectedState; - _latch = new CountDownLatch(1); - } - - @Override - public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException - { - if (stateChangeEvent.getState() == _expectedState) - { - _latch.countDown(); - } - } - - public boolean awaitForStateChange(long timeout, TimeUnit timeUnit) throws InterruptedException - { - return _latch.await(timeout, timeUnit); - } -} \ No newline at end of file diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java index 97fd927f01..7269988042 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java @@ -46,6 +46,7 @@ import org.apache.qpid.server.model.ReplicationNode; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.test.utils.QpidTestCase; @@ -185,7 +186,7 @@ public class VirtualHostTest extends QpidTestCase _host.setDesiredState(State.INITIALISING, State.ACTIVE); assertEquals("Unexpected host name", hostName, _host.getName()); assertEquals("Unexpected host type", StandardVirtualHostFactory.TYPE, _host.getType()); - assertEquals("Unexpected store type", BDBMessageStore.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE)); + assertEquals("Unexpected store type", new BDBMessageStoreFactory().getType(), _host.getAttribute(VirtualHost.STORE_TYPE)); assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH)); BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore(); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java index 138b4bc398..330abce5cf 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java @@ -32,7 +32,6 @@ import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.ReplicationNode; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade; import org.apache.qpid.test.utils.QpidTestCase; public class LocalReplicationNodeTest extends QpidTestCase @@ -184,6 +183,7 @@ public class LocalReplicationNodeTest extends QpidTestCase attributes.put(ReplicationNode.GROUP_NAME, "testGroup"); attributes.put(ReplicationNode.HOST_PORT, "localhost:5000"); attributes.put(ReplicationNode.HELPER_HOST_PORT, "localhost:5001"); + attributes.put(ReplicationNode.STORE_PATH, TMP_FOLDER + File.separator + getTestName()); return attributes; } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/NoopReplicationGroupListener.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/NoopReplicationGroupListener.java new file mode 100644 index 0000000000..21c902ae8f --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/NoopReplicationGroupListener.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import org.apache.qpid.server.model.ReplicationNode; +import org.apache.qpid.server.replication.ReplicationGroupListener; + +class NoopReplicationGroupListener implements ReplicationGroupListener +{ + @Override + public void onReplicationNodeRecovered(ReplicationNode node) + { + } + + @Override + public void onReplicationNodeAddedToGroup(ReplicationNode node) + { + } + + @Override + public void onReplicationNodeRemovedFromGroup(ReplicationNode node) + { + } +} \ No newline at end of file diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java new file mode 100644 index 0000000000..cea7d52d43 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -0,0 +1,594 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC; +import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY; +import static org.apache.qpid.server.model.ReplicationNode.DURABILITY; +import static org.apache.qpid.server.model.ReplicationNode.GROUP_NAME; +import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT; +import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT; +import static org.apache.qpid.server.model.ReplicationNode.NAME; +import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS; +import static org.apache.qpid.server.model.ReplicationNode.STORE_PATH; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.model.ReplicationNode; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.replication.ReplicationGroupListener; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode; +import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.test.utils.TestFileUtils; +import org.apache.qpid.util.FileUtils; + +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.Durability; +import com.sleepycat.je.Environment; +import com.sleepycat.je.rep.InsufficientReplicasException; +import com.sleepycat.je.rep.ReplicatedEnvironment.State; +import com.sleepycat.je.rep.ReplicationConfig; +import com.sleepycat.je.rep.StateChangeEvent; +import com.sleepycat.je.rep.StateChangeListener; + +public class ReplicatedEnvironmentFacadeTest extends QpidTestCase +{ + private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort(); + private static final int LISTENER_TIMEOUT = 5; + private static final int WAIT_STATE_CHANGE_TIMEOUT = 30; + private static final String TEST_GROUP_NAME = "testGroupName"; + private static final String TEST_NODE_NAME = "testNodeName"; + private static final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT; + private static final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT; + private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString(); + private static final boolean TEST_DESIGNATED_PRIMARY = false; + private static final boolean TEST_COALESCING_SYNC = true; + + private File _storePath; + private final Map _nodes = new HashMap(); + private VirtualHost _virtualHost = mock(VirtualHost.class); + private RemoteReplicationNodeFactory _remoteReplicationNodeFactory = new ReplicatedEnvironmentFacadeFactory.RemoteReplicationNodeFactoryImpl(_virtualHost); + + public void setUp() throws Exception + { + super.setUp(); + + _storePath = TestFileUtils.createTestDirectory("bdb", true); + + when(_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL)).thenReturn(100L); + when(_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT)).thenReturn(100L); + } + + @Override + public void tearDown() throws Exception + { + try + { + for (EnvironmentFacade ef : _nodes.values()) + { + ef.close(); + } + } + finally + { + try + { + if (_storePath != null) + { + FileUtils.delete(_storePath, true); + } + } + finally + { + super.tearDown(); + } + } + } + public void testEnvironmentFacade() throws Exception + { + EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster(); + assertNotNull("Environment should not be null", ef); + Environment e = ef.getEnvironment(); + assertTrue("Environment is not valid", e.isValid()); + } + + public void testClose() throws Exception + { + EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster(); + ef.close(); + Environment e = ef.getEnvironment(); + + assertNull("Environment should be null after facade close", e); + } + + public void testOpenDatabases() throws Exception + { + EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster(); + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + ef.openDatabases(dbConfig, "test1", "test2"); + Database test1 = ef.getOpenDatabase("test1"); + Database test2 = ef.getOpenDatabase("test2"); + + assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); + assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName()); + } + + public void testGetOpenDatabaseForNonExistingDatabase() throws Exception + { + EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster(); + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + ef.openDatabases(dbConfig, "test1"); + Database test1 = ef.getOpenDatabase("test1"); + assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); + try + { + ef.getOpenDatabase("test2"); + fail("An exception should be thrown for the non existing database"); + } + catch(IllegalArgumentException e) + { + assertEquals("Unexpected exception message", "Database with name 'test2' has never been requested to be opened", e.getMessage()); + } + } + + public void testGetGroupName() throws Exception + { + assertEquals("Unexpected group name", TEST_GROUP_NAME, ((ReplicatedEnvironmentFacade) createMaster()).getGroupName()); + } + + public void testGetNodeName() throws Exception + { + assertEquals("Unexpected group name", TEST_NODE_NAME, ((ReplicatedEnvironmentFacade) createMaster()).getNodeName()); + } + + public void testGetNodeHostPort() throws Exception + { + assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, ((ReplicatedEnvironmentFacade) createMaster()).getHostPort()); + } + + public void testGetHelperHostPort() throws Exception + { + assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, ((ReplicatedEnvironmentFacade) createMaster()).getHelperHostPort()); + } + + public void testGetDurability() throws Exception + { + assertEquals("Unexpected durability", TEST_DURABILITY.toString(), ((ReplicatedEnvironmentFacade) createMaster()).getDurability()); + } + + public void testIsCoalescingSync() throws Exception + { + assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, ((ReplicatedEnvironmentFacade) createMaster()).isCoalescingSync()); + } + + public void testGetNodeState() throws Exception + { + assertEquals("Unexpected state", State.MASTER.name(), ((ReplicatedEnvironmentFacade) createMaster()).getNodeState()); + } + + public void testIsDesignatedPrimary() throws Exception + { + ReplicatedEnvironmentFacade master = (ReplicatedEnvironmentFacade) createMaster(); + assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary()); + master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY); + assertEquals("Unexpected designated primary after change", !TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary()); + } + + public void testGetGroupMembers() throws Exception + { + List> groupMembers = ((ReplicatedEnvironmentFacade) createMaster()).getGroupMembers(); + Map expectedMember = new HashMap(); + expectedMember.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME); + expectedMember.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT); + Set> expectedGroupMembers = Collections.singleton(expectedMember); + assertEquals("Unexpected group members", expectedGroupMembers, new HashSet>(groupMembers)); + } + + public void testReplicationGroupListenerHearsAboutExistingRemoteReplicationNodes() throws Exception + { + ReplicatedEnvironmentFacade master = (ReplicatedEnvironmentFacade) createMaster(); + String nodeName2 = TEST_NODE_NAME + "_2"; + String host = "localhost"; + int port = getNextAvailable(TEST_NODE_PORT + 1); + String node2NodeHostPort = host + ":" + port; + + final AtomicInteger invocationCount = new AtomicInteger(); + final CountDownLatch nodeRecoveryLatch = new CountDownLatch(1); + ReplicationGroupListener listener = new NoopReplicationGroupListener() + { + @Override + public void onReplicationNodeRecovered(ReplicationNode node) + { + nodeRecoveryLatch.countDown(); + invocationCount.incrementAndGet(); + } + }; + + addReplica(nodeName2, node2NodeHostPort, listener); + + List> groupMembers = master.getGroupMembers(); + assertEquals("Unexpected number of nodes", 2, groupMembers.size()); + + assertTrue("Listener not fired within timeout", nodeRecoveryLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + assertEquals("Unexpected number of listener invocations", 1, invocationCount.get()); + } + + public void testReplicationGroupListenerHearsNodeAdded() throws Exception + { + final CountDownLatch nodeAddedLatch = new CountDownLatch(1); + final AtomicInteger invocationCount = new AtomicInteger(); + ReplicationGroupListener listener = new NoopReplicationGroupListener() + { + @Override + public void onReplicationNodeAddedToGroup(ReplicationNode node) + { + invocationCount.getAndIncrement(); + nodeAddedLatch.countDown(); + } + }; + + TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); + ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener); + assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + List> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers(); + assertEquals("Unexpected number of nodes at start of test", 1, initialGroupMembers.size()); + + String node2Name = TEST_NODE_NAME + "_2"; + String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1); + addReplica(node2Name, node2NodeHostPort); + + assertTrue("Listener not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + List> groupMembers = replicatedEnvironmentFacade.getGroupMembers(); + assertEquals("Unexpected number of nodes", 2, groupMembers.size()); + + assertEquals("Unexpected number of listener invocations", 1, invocationCount.get()); + } + + public void testReplicationGroupListenerHearsNodeRemoved() throws Exception + { + final CountDownLatch nodeDeletedLatch = new CountDownLatch(1); + final CountDownLatch nodeAddedLatch = new CountDownLatch(1); + final AtomicInteger invocationCount = new AtomicInteger(); + ReplicationGroupListener listener = new NoopReplicationGroupListener() + { + @Override + public void onReplicationNodeRecovered(ReplicationNode node) + { + nodeAddedLatch.countDown(); + } + + @Override + public void onReplicationNodeAddedToGroup(ReplicationNode node) + { + nodeAddedLatch.countDown(); + } + + @Override + public void onReplicationNodeRemovedFromGroup(ReplicationNode node) + { + invocationCount.getAndIncrement(); + nodeDeletedLatch.countDown(); + } + }; + + TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); + ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener); + assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + String node2Name = TEST_NODE_NAME + "_2"; + String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1); + addReplica(node2Name, node2NodeHostPort); + + List> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers(); + assertEquals("Unexpected number of nodes at start of test", 2, initialGroupMembers.size()); + + // Need to await the listener hearing the addition of the node to the model. + assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + // Now remove the node and ensure we hear the event + replicatedEnvironmentFacade.removeNodeFromGroup(node2Name); + + assertTrue("Node delete not fired within timeout", nodeDeletedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + List> groupMembers = replicatedEnvironmentFacade.getGroupMembers(); + assertEquals("Unexpected number of nodes after node removal", 1, groupMembers.size()); + + assertEquals("Unexpected number of listener invocations", 1, invocationCount.get()); + } + + public void testMasterHearsRemoteNodeRoles() throws Exception + { + + final CountDownLatch nodeAddedLatch = new CountDownLatch(1); + final AtomicReference nodeRef = new AtomicReference(); + ReplicationGroupListener listener = new NoopReplicationGroupListener() + { + @Override + public void onReplicationNodeAddedToGroup(ReplicationNode node) + { + nodeRef.set(node); + nodeAddedLatch.countDown(); + } + }; + + TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); + ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener); + assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + String node2Name = TEST_NODE_NAME + "_2"; + String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1); + addReplica(node2Name, node2NodeHostPort); + + List> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers(); + assertEquals("Unexpected number of nodes at start of test", 2, initialGroupMembers.size()); + + assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + RemoteReplicationNode remoteNode = (RemoteReplicationNode)nodeRef.get(); + assertEquals("Unexpcted node name", node2Name, remoteNode.getName()); + + // Need to poll to await the remote node updating itself + long timeout = System.currentTimeMillis() + 5000; + while(!State.REPLICA.name().equals(remoteNode.getAttribute(ReplicationNode.ROLE)) && System.currentTimeMillis() < timeout) + { + Thread.sleep(200); + } + + assertEquals("Unexpcted node role (after waiting)", State.REPLICA.name(), remoteNode.getAttribute(ReplicationNode.ROLE)); + assertNotNull("Replica node " + ReplicationNode.JOIN_TIME + " attribute is not set", remoteNode.getAttribute(ReplicationNode.JOIN_TIME)); + assertNotNull("Replica node " + ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID + " attribute is not set", remoteNode.getAttribute(ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)); + } + + public void testRemoveNodeFromGroup() throws Exception + { + ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) createMaster(); + + String node2Name = TEST_NODE_NAME + "_2"; + String node2NodeHostPort = "localhost:" + getNextAvailable(TEST_NODE_PORT + 1); + ReplicatedEnvironmentFacade ref2 = addReplica(node2Name, node2NodeHostPort); + + List> groupMembers = environmentFacade.getGroupMembers(); + assertEquals("Unexpected group members count", 2, groupMembers.size()); + ref2.close(); + + environmentFacade.removeNodeFromGroup(node2Name); + groupMembers = environmentFacade.getGroupMembers(); + assertEquals("Unexpected group members count", 1, groupMembers.size()); + } + + public void testSetDesignatedPrimary() throws Exception + { + ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) createMaster(); + environmentFacade.setDesignatedPrimary(false); + assertFalse("Unexpected designated primary", environmentFacade.isDesignatedPrimary()); + } + + public void testGetNodePriority() throws Exception + { + assertEquals("Unexpected node priority", 1, ((ReplicatedEnvironmentFacade) createMaster()).getPriority()); + } + + public void testGetElectableGroupSizeOverride() throws Exception + { + assertEquals("Unexpected Electable Group Size Override", 0, ((ReplicatedEnvironmentFacade) createMaster()).getElectableGroupSizeOverride()); + } + + public void testEnvironmentRestartOnInsufficientReplicas() throws Exception + { + ReplicatedEnvironmentFacade master = (ReplicatedEnvironmentFacade) createMaster(); + + int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); + String replica1NodeName = TEST_NODE_NAME + "_1"; + String replica1NodeHostPort = "localhost:" + replica1Port; + ReplicatedEnvironmentFacade replica1 = addReplica(replica1NodeName, replica1NodeHostPort); + + int replica2Port = getNextAvailable(replica1Port + 1); + String replica2NodeName = TEST_NODE_NAME + "_2"; + String replica2NodeHostPort = "localhost:" + replica2Port; + ReplicatedEnvironmentFacade replica2 = addReplica(replica2NodeName, replica2NodeHostPort); + + String databaseName = "test"; + + DatabaseConfig dbConfig = createDatabase(master, databaseName); + + // close replicas + replica1.close(); + replica2.close(); + + Environment e = master.getEnvironment(); + Database db = master.getOpenDatabase(databaseName); + try + { + master.openDatabases(dbConfig, "test2"); + fail("Opening of new database without quorum should fail"); + } + catch(InsufficientReplicasException ex) + { + master.handleDatabaseException(null, ex); + } + + replica1 = addReplica(replica1NodeName, replica1NodeHostPort); + replica2 = addReplica(replica2NodeName, replica2NodeHostPort); + + // Need to poll to await the remote node updating itself + long timeout = System.currentTimeMillis() + 5000; + while(!(State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ) && System.currentTimeMillis() < timeout) + { + Thread.sleep(200); + } + + assertTrue("The node could not rejoin the cluster. State is " + master.getNodeState(), + State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ); + + Environment e2 = master.getEnvironment(); + assertNotSame("Environment has not been restarted", e2, e); + + Database db1 = master.getOpenDatabase(databaseName); + assertNotSame("Database should be the re-created", db1, db); + } + + public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception + { + final CountDownLatch masterLatch = new CountDownLatch(1); + final AtomicInteger masterStateChangeCount = new AtomicInteger(); + final CountDownLatch unknownLatch = new CountDownLatch(1); + final AtomicInteger unknownStateChangeCount = new AtomicInteger(); + StateChangeListener stateChangeListener = new StateChangeListener() + { + @Override + public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException + { + if (stateChangeEvent.getState() == State.MASTER) + { + masterStateChangeCount.incrementAndGet(); + masterLatch.countDown(); + } + else if (stateChangeEvent.getState() == State.UNKNOWN) + { + unknownStateChangeCount.incrementAndGet(); + unknownLatch.countDown(); + } + } + }; + + addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener()); + assertTrue("Master was not started", masterLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); + String node1NodeHostPort = "localhost:" + replica1Port; + int replica2Port = getNextAvailable(replica1Port + 1); + String node2NodeHostPort = "localhost:" + replica2Port; + + ReplicatedEnvironmentFacade replica1 = addReplica(TEST_NODE_NAME + "_1", node1NodeHostPort); + ReplicatedEnvironmentFacade replica2 = addReplica(TEST_NODE_NAME + "_2", node2NodeHostPort); + + // close replicas + replica1.close(); + replica2.close(); + + assertTrue("Environment should be recreated and go into unknown state", + unknownLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS)); + + assertEquals("Node made master an unexpected number of times", 1, masterStateChangeCount.get()); + assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get()); + } + + public void testCloseStateTransitions() throws Exception + { + ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster(); + + assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState()); + replicatedEnvironmentFacade.close(); + assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState()); + } + + private ReplicatedEnvironmentFacade createMaster() throws Exception + { + TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); + ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener()); + assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + return env; + } + + private ReplicatedEnvironmentFacade addReplica(String nodeName, String nodeHostPort) throws Exception + { + return addReplica(nodeName, nodeHostPort, new NoopReplicationGroupListener()); + } + + private ReplicatedEnvironmentFacade addReplica(String nodeName, String nodeHostPort, ReplicationGroupListener replicationGroupListener) + throws Exception + { + TestStateChangeListener testStateChangeListener = new TestStateChangeListener(State.REPLICA); + ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, replicationGroupListener); + assertTrue("Replica " + nodeName + " was not started", testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + return replicaEnvironmentFacade; + } + private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary, + State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener) + { + + ReplicationNode node = createReplicationNodeMock(nodeName, nodeHostPort, designatedPrimary); + ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(node, _remoteReplicationNodeFactory); + ref.setReplicationGroupListener(replicationGroupListener); + ref.setStateChangeListener(stateChangeListener); + _nodes.put(nodeName, ref); + return ref; + } + + private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener groupChangeListener) + { + return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener, groupChangeListener); + } + + private DatabaseConfig createDatabase(ReplicatedEnvironmentFacade environmentFacade, String databaseName) throws AMQStoreException + { + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + environmentFacade.openDatabases(dbConfig, databaseName); + return dbConfig; + } + + private ReplicationNode createReplicationNodeMock(String nodeName, String nodeHostPort, boolean designatedPrimary) + { + ReplicationNode node = mock(ReplicationNode.class); + when(node.getAttribute(NAME)).thenReturn(nodeName); + when(node.getName()).thenReturn(nodeName); + when(node.getAttribute(HOST_PORT)).thenReturn(nodeHostPort); + when(node.getAttribute(DESIGNATED_PRIMARY)).thenReturn(designatedPrimary); + when(node.getAttribute(GROUP_NAME)).thenReturn(TEST_GROUP_NAME); + when(node.getAttribute(HELPER_HOST_PORT)).thenReturn(TEST_NODE_HELPER_HOST_PORT); + when(node.getAttribute(DURABILITY)).thenReturn(TEST_DURABILITY); + when(node.getAttribute(COALESCING_SYNC)).thenReturn(TEST_COALESCING_SYNC); + + Map repConfig = new HashMap(); + repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s"); + repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s"); + when(node.getAttribute(REPLICATION_PARAMETERS)).thenReturn(repConfig); + + when(node.getAttribute(STORE_PATH)).thenReturn(new File(_storePath, nodeName).getAbsolutePath()); + return node; + } +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java new file mode 100644 index 0000000000..1e244e1f89 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import com.sleepycat.je.rep.StateChangeEvent; +import com.sleepycat.je.rep.StateChangeListener; +import com.sleepycat.je.rep.ReplicatedEnvironment.State; + +class TestStateChangeListener implements StateChangeListener +{ + private final State _expectedState; + private final CountDownLatch _latch; + + public TestStateChangeListener(State expectedState) + { + _expectedState = expectedState; + _latch = new CountDownLatch(1); + } + + @Override + public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException + { + if (stateChangeEvent.getState() == _expectedState) + { + _latch.countDown(); + } + } + + public boolean awaitForStateChange(long timeout, TimeUnit timeUnit) throws InterruptedException + { + return _latch.await(timeout, timeUnit); + } +} \ No newline at end of file diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java index c190498a36..6bddf5876f 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java @@ -38,6 +38,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.management.common.mbeans.ManagedBroker; import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java index f8b0fb362c..d7b27baa4e 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java @@ -19,6 +19,7 @@ */ package org.apache.qpid.server.store.berkeleydb; +import java.io.File; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; @@ -41,6 +42,7 @@ import javax.jms.Connection; import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.ReplicationNode; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -69,7 +71,7 @@ public class HATestClusterCreator private final String _virtualHostName; private final String _ipAddressOfBroker; - private final String _groupName ; + private final String _groupName; private final int _numberOfNodes; private int _bdbHelperPort; private int _primaryBrokerPort; @@ -350,6 +352,7 @@ public class HATestClusterCreator { replicationNodeAttributes.put(ReplicationNode.REPLICATION_PARAMETERS, replicationParameters); } + replicationNodeAttributes.put(ReplicationNode.STORE_PATH, System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + nodeName); // ha virtual host Map virtualHostAttributes = new HashMap(); -- cgit v1.2.1