summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-07-04 01:16:07 +0000
committerAlex Rudyy <orudyy@apache.org>2014-07-04 01:16:07 +0000
commita2dfed6abeaad71e69d9d73c6db42b47d7d93c66 (patch)
tree3d985f74b3e8c434bb65c6517093e944b2ba8b34
parent29481e51fdbd1a87c7ec75eac8e60ba93028e123 (diff)
downloadqpid-python-a2dfed6abeaad71e69d9d73c6db42b47d7d93c66.tar.gz
QPID-5867: Add intruder protection functionality for a cluster of BDB HA virtual host nodes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1607772 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/HASettings.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java1
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java260
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java5
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java43
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java3
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java8
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java124
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java202
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java116
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java60
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java9
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java43
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java2
19 files changed, 874 insertions, 28 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/HASettings.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/HASettings.java
index 31e9987182..fd4a7bc1c7 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/HASettings.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/HASettings.java
@@ -34,4 +34,6 @@ public interface HASettings extends FileBasedSettings
int getPriority();
int getQuorumOverride();
+
+ String getHelperNodeName();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
index 90fb086dc5..ae0d4b13ae 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
@@ -34,4 +34,5 @@ public interface ReplicatedEnvironmentConfiguration extends StandardEnvironmentC
int getPriority();
int getQuorumOverride();
Map<String, String> getReplicationParameters();
+ String getHelperNodeName();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index dff5fc372d..8b9039c792 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.store.berkeleydb.replication;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -33,6 +34,7 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -47,11 +49,13 @@ import com.sleepycat.je.Sequence;
import com.sleepycat.je.SequenceConfig;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener;
import org.apache.qpid.server.util.DaemonThreadFactory;
+import org.codehaus.jackson.map.ObjectMapper;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
@@ -63,10 +67,13 @@ import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
+import com.sleepycat.je.rep.AppStateMonitor;
import com.sleepycat.je.rep.InsufficientLogException;
+import com.sleepycat.je.rep.InsufficientAcksException;
import com.sleepycat.je.rep.InsufficientReplicasException;
import com.sleepycat.je.rep.NetworkRestore;
import com.sleepycat.je.rep.NetworkRestoreConfig;
+import com.sleepycat.je.rep.NodeType;
import com.sleepycat.je.rep.NodeState;
import com.sleepycat.je.rep.RepInternal;
import com.sleepycat.je.rep.ReplicatedEnvironment;
@@ -143,6 +150,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}});
public static final String TYPE = "BDB-HA";
+ private static final String PERMITTED_NODE_LIST = "permittedNodes";
private final ReplicatedEnvironmentConfiguration _configuration;
private final String _prettyGroupNodeName;
@@ -165,6 +173,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private final ConcurrentHashMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>();
+ private final Set<String> _permittedNodes = new CopyOnWriteArraySet<String>();
public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration)
{
@@ -190,7 +199,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
_environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName));
_groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
-
// create environment in a separate thread to avoid renaming of the current thread by JE
_environment = createEnvironment(true);
populateExistingRemoteReplicationNodes();
@@ -298,7 +306,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
@Override
public DatabaseException handleDatabaseException(String contextMessage, final DatabaseException dbe)
{
- boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException || dbe instanceof RestartRequiredException);
+ boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientAcksException || dbe instanceof RestartRequiredException);
if (restart)
{
tryToRestartEnvironment(dbe);
@@ -821,6 +829,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
_environment = createEnvironment(false);
+ registerAppStateMonitorIfPermittedNodesSpecified();
+
if (_stateChangeListener.get() != null)
{
_environment.setStateChangeListener(this);
@@ -935,25 +945,34 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
boolean designatedPrimary = _configuration.isDesignatedPrimary();
int priority = _configuration.getPriority();
int quorumOverride = _configuration.getQuorumOverride();
+ String nodeName = _configuration.getName();
+ String helperNodeName = _configuration.getHelperNodeName();
if (LOGGER.isInfoEnabled())
{
LOGGER.info("Creating environment");
LOGGER.info("Environment path " + _environmentDirectory.getAbsolutePath());
LOGGER.info("Group name " + groupName);
- LOGGER.info("Node name " + _configuration.getName());
+ LOGGER.info("Node name " + nodeName);
LOGGER.info("Node host port " + hostPort);
LOGGER.info("Helper host port " + helperHostPort);
+ LOGGER.info("Helper host name " + helperNodeName);
LOGGER.info("Durability " + _defaultDurability);
LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary);
LOGGER.info("Node priority " + priority);
LOGGER.info("Quorum override " + quorumOverride);
+ LOGGER.info("Permitted node list " + _permittedNodes);
+ }
+
+ if (helperNodeName != null && _permittedNodes.isEmpty() && !helperHostPort.equals(hostPort))
+ {
+ connectToHelperNodeAndCheckPermittedHosts(helperNodeName, helperHostPort, hostPort);
}
Map<String, String> replicationEnvironmentParameters = new HashMap<>(ReplicatedEnvironmentFacade.REPCONFIG_DEFAULTS);
replicationEnvironmentParameters.putAll(_configuration.getReplicationParameters());
- ReplicationConfig replicationConfig = new ReplicationConfig(groupName, _configuration.getName(), hostPort);
+ ReplicationConfig replicationConfig = new ReplicationConfig(groupName, nodeName, hostPort);
replicationConfig.setHelperHosts(helperHostPort);
replicationConfig.setDesignatedPrimary(designatedPrimary);
replicationConfig.setNodePriority(priority);
@@ -1111,6 +1130,144 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
+ public void setPermittedNodes(Collection<String> permittedNodes)
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Setting permitted nodes to " + permittedNodes);
+ }
+
+ _permittedNodes.clear();
+ if (permittedNodes != null)
+ {
+ _permittedNodes.addAll(permittedNodes);
+ registerAppStateMonitorIfPermittedNodesSpecified();
+
+ ReplicationGroupListener listener = _replicationGroupListener.get();
+ for(ReplicationNode node: _remoteReplicationNodes.values())
+ {
+ if (!isNodePermitted(node))
+ {
+ onIntruder(listener, node);
+ }
+ }
+ }
+ }
+
+ private void registerAppStateMonitorIfPermittedNodesSpecified()
+ {
+ if (!_permittedNodes.isEmpty())
+ {
+ byte[] data = permittedNodeListToBytes(_permittedNodes);
+ _environment.registerAppStateMonitor(new EnvironmentStateHolder(data));
+ }
+ }
+
+ private boolean isNodePermitted(ReplicationNode replicationNode)
+ {
+ if (_permittedNodes.isEmpty())
+ {
+ return true;
+ }
+
+ String nodeHostPort = getHostPort(replicationNode);
+ return _permittedNodes.contains(nodeHostPort);
+ }
+
+ private String getHostPort(ReplicationNode replicationNode)
+ {
+ return replicationNode.getHostName() + ":" + replicationNode.getPort();
+ }
+
+
+ private void onIntruder(ReplicationGroupListener replicationGroupListener, ReplicationNode replicationNode)
+ {
+ if (replicationGroupListener != null)
+ {
+ replicationGroupListener.onIntruderNode(replicationNode);
+ }
+ else
+ {
+ LOGGER.warn(String.format("Found an intruder node '%s' from ''%s' . The node is not listed in permitted list: %s",
+ replicationNode.getName(), getHostPort(replicationNode), String.valueOf(_permittedNodes)));
+ }
+ }
+
+ private byte[] permittedNodeListToBytes(Set<String> permittedNodeList)
+ {
+ HashMap<String, Object> data = new HashMap<String, Object>();
+ data.put(PERMITTED_NODE_LIST, permittedNodeList);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectMapper objectMapper = new ObjectMapper();
+ try
+ {
+ objectMapper.writeValue(baos, data);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Unexpected exception on serializing of permitted node list into json", e);
+ }
+ return baos.toByteArray();
+ }
+
+ Collection<String> bytesToPermittedNodeList(byte[] applicationState)
+ {
+ if (applicationState == null || applicationState.length == 0)
+ {
+ return Collections.emptySet();
+ }
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ try
+ {
+ Map<String, Object> settings = objectMapper.readValue(applicationState, Map.class);
+ return (Collection<String>)settings.get(PERMITTED_NODE_LIST);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Unexpected exception on de-serializing of application state", e);
+ }
+ }
+
+ private void connectToHelperNodeAndCheckPermittedHosts(String helperNodeName, String helperHostPort, String hostPort)
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug(String.format("Requesting state of the node '%s' at '%s'", helperNodeName, helperHostPort));
+ }
+
+ Collection<String> permittedNodes = null;
+ try
+ {
+ NodeState state = getRemoteNodeState(new ReplicationNodeImpl(helperNodeName, helperHostPort));
+ byte[] applicationState = state.getAppState();
+ permittedNodes = bytesToPermittedNodeList(applicationState);
+ }
+ catch (IOException e)
+ {
+ throw new IllegalConfigurationException(String.format("Cannot connect to '%s'", helperHostPort), e);
+ }
+ catch (ServiceConnectFailedException e)
+ {
+ throw new IllegalConfigurationException(String.format("Failure to connect to '%s'", helperHostPort), e);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(String.format("Unexpected exception on attempt to retrieve state from '%s' at '%s'",
+ helperNodeName, helperHostPort), e);
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug(String.format("Attribute 'permittedNodes' on node '%s' is set to '%s'", helperNodeName, String.valueOf(permittedNodes)));
+ }
+
+ if (permittedNodes != null && !permittedNodes.isEmpty() && !permittedNodes.contains(hostPort))
+ {
+ throw new IllegalConfigurationException(String.format("Node from '%s' is not permitted!", hostPort));
+ }
+ }
+
private void populateExistingRemoteReplicationNodes()
{
ReplicationGroup group = _environment.getGroup();
@@ -1157,7 +1314,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
Map<ReplicationNode, NodeState> nodeStates = discoverNodeStates(_remoteReplicationNodes.values());
- executeDabasePingerOnNodeChangesIfMaster(nodeStates);
+ executeDatabasePingerOnNodeChangesIfMaster(nodeStates);
notifyGroupListenerAboutNodeStates(nodeStates);
}
@@ -1187,7 +1344,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
if (env != null)
{
ReplicationGroup group = env.getGroup();
- Set<ReplicationNode> nodes = new HashSet<ReplicationNode>(group.getElectableNodes());
+ Set<ReplicationNode> nodes = new HashSet<ReplicationNode>(group.getNodes());
String localNodeName = getNodeName();
Map<String, ReplicationNode> removalMap = new HashMap<String, ReplicationNode>(_remoteReplicationNodes);
@@ -1205,9 +1362,16 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
_remoteReplicationNodes.put(discoveredNodeName, replicationNode);
- if (replicationGroupListener != null)
+ if (isNodePermitted(replicationNode))
{
- replicationGroupListener.onReplicationNodeAddedToGroup(replicationNode);
+ if (replicationGroupListener != null)
+ {
+ replicationGroupListener.onReplicationNodeAddedToGroup(replicationNode);
+ }
+ }
+ else
+ {
+ onIntruder(replicationGroupListener, replicationNode);
}
}
else
@@ -1288,7 +1452,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return nodeStates;
}
- private void executeDabasePingerOnNodeChangesIfMaster(final Map<ReplicationNode, NodeState> nodeStates)
+ private void executeDatabasePingerOnNodeChangesIfMaster(final Map<ReplicationNode, NodeState> nodeStates)
{
if (ReplicatedEnvironment.State.MASTER == _environment.getState())
{
@@ -1330,4 +1494,82 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
CLOSING,
CLOSED
}
+
+ private static class EnvironmentStateHolder implements AppStateMonitor
+ {
+ private byte[] _data;
+
+ private EnvironmentStateHolder(byte[] data)
+ {
+ this._data = data;
+ }
+
+ @Override
+ public byte[] getAppState()
+ {
+ return _data;
+ }
+ }
+
+ static class ReplicationNodeImpl implements ReplicationNode
+ {
+
+ private final InetSocketAddress _address;
+ private final String _nodeName;
+ private final String _host;
+ private final int _port;
+
+ public ReplicationNodeImpl(String nodeName, String hostPort)
+ {
+ String[] tokens = hostPort.split(":");
+ if (tokens.length != 2)
+ {
+ throw new IllegalArgumentException("Unexpected host port value :" + hostPort);
+ }
+ _host = tokens[0];
+ _port = Integer.parseInt(tokens[1]);
+ _nodeName = nodeName;
+ _address = new InetSocketAddress(_host, _port);
+ }
+
+ @Override
+ public String getName()
+ {
+ return _nodeName;
+ }
+
+ @Override
+ public NodeType getType()
+ {
+ return NodeType.ELECTABLE;
+ }
+
+ @Override
+ public InetSocketAddress getSocketAddress()
+ {
+ return _address;
+ }
+
+ @Override
+ public String getHostName()
+ {
+ return _host;
+ }
+
+ @Override
+ public int getPort()
+ {
+ return _port;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ReplicationNodeImpl{" +
+ "_nodeName='" + _nodeName + '\'' +
+ ", _host='" + _host + '\'' +
+ ", _port=" + _port +
+ '}';
+ }
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
index d7b01bc829..f67d232b9f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
@@ -66,6 +66,12 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
}
@Override
+ public String getHelperNodeName()
+ {
+ return settings.getHelperNodeName();
+ }
+
+ @Override
public int getQuorumOverride()
{
return settings.getQuorumOverride();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java
index e88b9e8651..7f9f0fcf64 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java
@@ -48,4 +48,9 @@ public interface ReplicationGroupListener
*/
void onNodeState(ReplicationNode node, NodeState nodeState);
+ /**
+ * Invoked on intruder node detected
+ */
+ void onIntruderNode(ReplicationNode node);
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
index f231638426..477ffb5a64 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.virtualhost.berkeleydb;
+import java.util.List;
+
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.DerivedAttribute;
import org.apache.qpid.server.model.ManagedAttribute;
@@ -34,6 +36,7 @@ public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends Virtual
String COALESCING_SYNC = "coalescingSync";
String DURABILITY = "durability";
String STORE_PATH = "storePath";
+ String PERMITTED_NODES = "permittedNodes";
@ManagedAttribute( defaultValue = "SYNC")
String getLocalTransactionSynchronizationPolicy();
@@ -52,4 +55,7 @@ public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends Virtual
@ManagedAttribute(mandatory = true, defaultValue = "0")
Long getStoreOverfullSize();
+
+ @ManagedAttribute
+ List<String> getPermittedNodes();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
index ba210b470c..aaa5f6aca4 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.virtualhost.berkeleydb;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -54,6 +55,9 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm
@ManagedAttributeField
private Long _storeOverfullSize;
+ @ManagedAttributeField(afterSet = "applyPermittedNodes")
+ private List<String> _permittedNodes;
+
@ManagedObjectFactoryConstructor
public BDBHAVirtualHostImpl(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
{
@@ -127,6 +131,31 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm
String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getRemoteTransactionSynchronizationPolicy();
validateTransactionSynchronizationPolicy(policy);
}
+
+ if(changedAttributes.contains(PERMITTED_NODES))
+ {
+
+ List<String> permittedNodes = ((BDBHAVirtualHost<?>)proxyForValidation).getPermittedNodes();
+ if (permittedNodes != null)
+ {
+ for (String permittedNode: permittedNodes)
+ {
+ String[] tokens = permittedNode.split(":");
+ if (tokens.length != 2)
+ {
+ throw new IllegalArgumentException(String.format("Invalid permitted node specified '%s'. ", permittedNode));
+ }
+ try
+ {
+ Integer.parseInt(tokens[1]);
+ }
+ catch(Exception e)
+ {
+ throw new IllegalArgumentException(String.format("Invalid port is specified in permitted node '%s'. ", permittedNode));
+ }
+ }
+ }
+ }
}
private void validateTransactionSynchronizationPolicy(String policy)
@@ -158,4 +187,18 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm
return _storeOverfullSize;
}
+ @Override
+ public List<String> getPermittedNodes()
+ {
+ return _permittedNodes;
+ }
+
+ protected void applyPermittedNodes()
+ {
+ ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
+ if (facade != null)
+ {
+ facade.setPermittedNodes(getPermittedNodes());
+ }
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java
index 691096f59f..10b2b13bc9 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java
@@ -33,6 +33,7 @@ public interface BDBHARemoteReplicationNode<X extends BDBHARemoteReplicationNode
String ROLE = "role";
String LAST_KNOWN_REPLICATION_TRANSACTION_ID = "lastKnownReplicationTransactionId";
String JOIN_TIME = "joinTime";
+ String MONITOR = "monitor";
@DerivedAttribute
String getGroupName();
@@ -49,4 +50,6 @@ public interface BDBHARemoteReplicationNode<X extends BDBHARemoteReplicationNode
@DerivedAttribute
long getLastKnownReplicationTransactionId();
+ @DerivedAttribute
+ boolean isMonitor();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
index 4b5683b794..5327997498 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
@@ -54,6 +54,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
private volatile String _role;
private final AtomicReference<State> _state;
+ private final boolean _isMonitor;
public BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNode<?> virtualHostNode, Map<String, Object> attributes, ReplicatedEnvironmentFacade replicatedEnvironmentFacade)
{
@@ -61,6 +62,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
_address = (String)attributes.get(ADDRESS);
_replicatedEnvironmentFacade = replicatedEnvironmentFacade;
_state = new AtomicReference<State>(State.ACTIVE);
+ _isMonitor = (Boolean)attributes.get(MONITOR);
}
@Override
@@ -100,6 +102,12 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
}
@Override
+ public boolean isMonitor()
+ {
+ return _isMonitor;
+ }
+
+ @Override
public void deleted()
{
super.deleted();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
index 5e738e7701..ad12dc2447 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.virtualhostnode.berkeleydb;
-import java.util.Map;
-
import org.apache.qpid.server.model.DerivedAttribute;
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.store.berkeleydb.HASettings;
@@ -38,6 +36,7 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends
public static final String ROLE = "role";
public static final String LAST_KNOWN_REPLICATION_TRANSACTION_ID = "lastKnownReplicationTransactionId";
public static final String JOIN_TIME = "joinTime";
+ public static final String HELPER_NODE_NAME = "helperNodeName";
@ManagedAttribute(mandatory=true)
String getGroupName();
@@ -65,4 +64,7 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends
@DerivedAttribute
Long getJoinTime();
+
+ @ManagedAttribute(persist = false)
+ String getHelperNodeName();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index cf1cea3efa..3868025096 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -36,6 +36,7 @@ import javax.security.auth.Subject;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.rep.NodeState;
+import com.sleepycat.je.rep.NodeType;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicationNode;
import com.sleepycat.je.rep.StateChangeEvent;
@@ -44,6 +45,8 @@ import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
import com.sleepycat.je.rep.utilint.HostPortPair;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.updater.Task;
+import org.apache.qpid.server.configuration.updater.VoidTask;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
@@ -107,6 +110,9 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
@ManagedAttributeField(afterSet="postSetRole")
private String _role;
+ @ManagedAttributeField
+ private String _helperNodeName;
+
@ManagedObjectFactoryConstructor
public BDBHAVirtualHostNodeImpl(Map<String, Object> attributes, Broker<?> broker)
{
@@ -202,6 +208,12 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
return -1L;
}
+ @Override
+ public String getHelperNodeName()
+ {
+ return _helperNodeName;
+ }
+
@SuppressWarnings("rawtypes")
@Override
public Collection<? extends RemoteReplicationNode> getRemoteReplicationNodes()
@@ -260,6 +272,11 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.STORE_LOCATION(getStorePath()));
ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) getConfigurationStore().getEnvironmentFacade();
+ if (environmentFacade == null)
+ {
+ throw new IllegalStateException("Environment facade is not created");
+ }
+
if (_environmentFacade.compareAndSet(null, environmentFacade))
{
environmentFacade.setStateChangeListener(new BDBHAMessageStoreStateChangeListener());
@@ -276,11 +293,16 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
finally
{
- ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
- if (_environmentFacade.compareAndSet(environmentFacade, null))
- {
- environmentFacade.close();
- }
+ stopEnvironment();
+ }
+ }
+
+ private void stopEnvironment()
+ {
+ ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
+ if (_environmentFacade.compareAndSet(environmentFacade, null))
+ {
+ environmentFacade.close();
}
}
@@ -598,7 +620,20 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
private class RemoteNodesDiscoverer implements ReplicationGroupListener
{
@Override
- public void onReplicationNodeAddedToGroup(ReplicationNode node)
+ public void onReplicationNodeAddedToGroup(final ReplicationNode node)
+ {
+ getTaskExecutor().submit(new Task<Void>()
+ {
+ @Override
+ public Void execute()
+ {
+ addRemoteReplicationNode(node);
+ return null;
+ }
+ });
+ }
+
+ private void addRemoteReplicationNode(ReplicationNode node)
{
BDBHARemoteReplicationNodeImpl remoteNode = new BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNodeImpl.this, nodeToAttributes(node), getReplicatedEnvironmentFacade());
remoteNode.create();
@@ -606,14 +641,40 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
@Override
- public void onReplicationNodeRecovered(ReplicationNode node)
+ public void onReplicationNodeRecovered(final ReplicationNode node)
+ {
+ getTaskExecutor().submit(new Task<Void>()
+ {
+ @Override
+ public Void execute()
+ {
+ recoverRemoteReplicationNode(node);
+ return null;
+ }
+ });
+ }
+
+ private void recoverRemoteReplicationNode(ReplicationNode node)
{
BDBHARemoteReplicationNodeImpl remoteNode = new BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNodeImpl.this, nodeToAttributes(node), getReplicatedEnvironmentFacade());
remoteNode.open();
}
@Override
- public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
+ public void onReplicationNodeRemovedFromGroup(final ReplicationNode node)
+ {
+ getTaskExecutor().submit(new Task<Void>()
+ {
+ @Override
+ public Void execute()
+ {
+ removeRemoteReplicationNode(node);
+ return null;
+ }
+ });
+ }
+
+ private void removeRemoteReplicationNode(ReplicationNode node)
{
BDBHARemoteReplicationNodeImpl remoteNode = getChildByName(BDBHARemoteReplicationNodeImpl.class, node.getName());
if (remoteNode != null)
@@ -634,11 +695,55 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
else
{
- remoteNode.setRole(nodeState.getNodeState().name());
remoteNode.setJoinTime(nodeState.getJoinTime());
remoteNode.setLastTransactionId(nodeState.getCurrentTxnEndVLSN());
+ remoteNode.setRole(nodeState.getNodeState().name());
+ }
+ }
+ }
+
+ @Override
+ public void onIntruderNode(ReplicationNode node)
+ {
+ boolean inManagementMode = getParent(Broker.class).isManagementMode();
+ if (inManagementMode)
+ {
+ LOGGER.warn(String.format("Intruder replication node '%s' from '%s' is detected. Ignoring intruder in management mode",
+ node.getName(), node.getHostName() + ":" + node.getPort() ));
+ BDBHARemoteReplicationNodeImpl remoteNode = getChildByName(BDBHARemoteReplicationNodeImpl.class, node.getName());
+ if (remoteNode == null)
+ {
+ addRemoteReplicationNode(node);
}
}
+ else
+ {
+ LOGGER.error(String.format("Intruder node '%s' from '%s' is detected. Stopping down virtual host node '%s'",
+ node.getName(), node.getHostName() + ":" + node.getPort(), BDBHAVirtualHostNodeImpl.this.toString() ));
+
+ getTaskExecutor().submit(new Task<Void>()
+ {
+ @Override
+ public Void execute()
+ {
+ State state = getState();
+
+ if (state == State.ACTIVE)
+ {
+ try
+ {
+ stopAndSetStateTo(State.ERRORED);
+ }
+ finally
+ {
+ stopEnvironment();
+ }
+ notifyStateChanged(state, State.ERRORED);
+ }
+ return null;
+ }
+ });
+ }
}
private Map<String, Object> nodeToAttributes(ReplicationNode replicationNode)
@@ -647,6 +752,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
attributes.put(ConfiguredObject.NAME, replicationNode.getName());
attributes.put(ConfiguredObject.DURABLE, false);
attributes.put(BDBHARemoteReplicationNode.ADDRESS, replicationNode.getHostName() + ":" + replicationNode.getPort());
+ attributes.put(BDBHARemoteReplicationNode.MONITOR, replicationNode.getType() == NodeType.MONITOR);
return attributes;
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
index 80f6e7ea49..d830d488db 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
@@ -23,11 +23,13 @@ package org.apache.qpid.server.store.berkeleydb;
import static org.mockito.Mockito.when;
import java.io.File;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -38,8 +40,13 @@ import java.util.concurrent.atomic.AtomicReference;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicationConfig;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.MessageLogger;
+import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfigurationChangeListener;
@@ -505,6 +512,201 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
}
+ public void testIntruderProtection() throws Exception
+ {
+ int node1PortNumber = findFreePort();
+ String helperAddress = "localhost:" + node1PortNumber;
+ String groupName = "group";
+
+ Map<String, Object> node1Attributes = new HashMap<String, Object>();
+ node1Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+ node1Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ node1Attributes.put(BDBHAVirtualHostNode.NAME, "node1");
+ node1Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+ node1Attributes.put(BDBHAVirtualHostNode.ADDRESS, helperAddress);
+ node1Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+ node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1");
+
+ BDBHAVirtualHostNode<?> node1 = createAndStartHaVHN(node1Attributes);
+ BDBHAVirtualHost<?> host = (BDBHAVirtualHost<?>)node1.getVirtualHost();
+
+ List<String> permittedNodes = new ArrayList<String>();
+ int node2PortNumber = getNextAvailable(node1PortNumber+1);
+ permittedNodes.add(helperAddress);
+ permittedNodes.add("localhost:" + node2PortNumber);
+ host.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.PERMITTED_NODES, permittedNodes));
+
+ Map<String, Object> node2Attributes = new HashMap<String, Object>();
+ node2Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+ node2Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ node2Attributes.put(BDBHAVirtualHostNode.NAME, "node2");
+ node2Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+ node2Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node2PortNumber);
+ node2Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+ node2Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "2");
+ node2Attributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, "node1");
+ node2Attributes.put(BDBHAVirtualHostNode.PRIORITY, 0);
+
+ BDBHAVirtualHostNode<?> node2 = createAndStartHaVHN(node2Attributes);
+
+ int node3PortNumber = getNextAvailable(node2PortNumber+1);
+ Map<String, Object> node3Attributes = new HashMap<String, Object>();
+ node3Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+ node3Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ node3Attributes.put(BDBHAVirtualHostNode.NAME, "node3");
+ node3Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+ node3Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node3PortNumber);
+ node3Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+ node3Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "3");
+ node3Attributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, "node1");
+ node3Attributes.put(BDBHAVirtualHostNode.PRIORITY, 0);
+
+ try
+ {
+ createHaVHN(node3Attributes);
+ fail("The VHN should not be permitted to join the group");
+ }
+ catch(IllegalConfigurationException e)
+ {
+ assertEquals("Unexpected exception message", String.format("Node from '%s' is not permitted!", "localhost:" + node3PortNumber), e.getMessage());
+ }
+
+ // join node by skipping a step retrieving node state and checking the permitted hosts
+ node3Attributes.remove(BDBHAVirtualHostNode.HELPER_NODE_NAME);
+
+ final CountDownLatch stopLatch = new CountDownLatch(1);
+ ConfigurationChangeListener listener = new NoopConfigurationChangeListener()
+ {
+ @Override
+ public void stateChanged(ConfiguredObject<?> object, State oldState, State newState)
+ {
+ if (newState == State.ERRORED)
+ {
+ stopLatch.countDown();
+ }
+ }
+ };
+ node1.addChangeListener(listener);
+
+ createHaVHN(node3Attributes);
+
+ assertTrue("Intruder protection was not triggered during expected timeout", stopLatch.await(10, TimeUnit.SECONDS));
+ }
+
+ public void testIntruderProtectionInManagementMode() throws Exception
+ {
+ int node1PortNumber = findFreePort();
+ String helperAddress = "localhost:" + node1PortNumber;
+ String groupName = "group";
+
+ Map<String, Object> node1Attributes = new HashMap<String, Object>();
+ node1Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+ node1Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ node1Attributes.put(BDBHAVirtualHostNode.NAME, "node1");
+ node1Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+ node1Attributes.put(BDBHAVirtualHostNode.ADDRESS, helperAddress);
+ node1Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+ node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1");
+
+ BDBHAVirtualHostNode<?> node1 = createAndStartHaVHN(node1Attributes);
+
+ int node2PortNumber = getNextAvailable(node1PortNumber+1);
+ Map<String, Object> node2Attributes = new HashMap<String, Object>();
+ node2Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+ node2Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ node2Attributes.put(BDBHAVirtualHostNode.NAME, "node2");
+ node2Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+ node2Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node2PortNumber);
+ node2Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+ node2Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "2");
+ node2Attributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, "node1");
+ node2Attributes.put(BDBHAVirtualHostNode.PRIORITY, 0);
+
+ BDBHAVirtualHostNode<?> node2 = createAndStartHaVHN(node2Attributes);
+
+ final CountDownLatch stopLatch = new CountDownLatch(1);
+ ConfigurationChangeListener listener = new NoopConfigurationChangeListener()
+ {
+ @Override
+ public void stateChanged(ConfiguredObject<?> object, State oldState, State newState)
+ {
+ if (newState == State.ERRORED)
+ {
+ stopLatch.countDown();
+ }
+ }
+ };
+ node1.addChangeListener(listener);
+
+ BDBHAVirtualHost<?> host = (BDBHAVirtualHost<?>)node1.getVirtualHost();
+
+ List<String> permittedNodes = new ArrayList<String>();
+ permittedNodes.add(helperAddress);
+ host.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.PERMITTED_NODES, permittedNodes));
+
+ assertTrue("Intruder protection was not triggered during expected timeout", stopLatch.await(10, TimeUnit.SECONDS));
+
+ when(_broker.isManagementMode()).thenReturn(true);
+ node1.start();
+
+ awaitRemoteNodes(node1, 1);
+
+ BDBHARemoteReplicationNode<?> remote = findRemoteNode(node1, node2.getName());
+ remote.delete();
+ }
+
+ public void testIntruderConnectedBeforePermittedNodesAreSet() throws Exception
+ {
+ int node1PortNumber = findFreePort();
+ String helperAddress = "localhost:" + node1PortNumber;
+ String groupName = "group";
+
+ Map<String, Object> node1Attributes = new HashMap<String, Object>();
+ node1Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+ node1Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ node1Attributes.put(BDBHAVirtualHostNode.NAME, "node1");
+ node1Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+ node1Attributes.put(BDBHAVirtualHostNode.ADDRESS, helperAddress);
+ node1Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+ node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1");
+
+ BDBHAVirtualHostNode<?> node1 = createAndStartHaVHN(node1Attributes);
+
+ int node2PortNumber = getNextAvailable(node1PortNumber+1);
+ Map<String, Object> node2Attributes = new HashMap<String, Object>();
+ node2Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+ node2Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ node2Attributes.put(BDBHAVirtualHostNode.NAME, "node2");
+ node2Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+ node2Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node2PortNumber);
+ node2Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+ node2Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "2");
+ node2Attributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, "node1");
+
+ createAndStartHaVHN(node2Attributes);
+
+ final CountDownLatch stopLatch = new CountDownLatch(1);
+ ConfigurationChangeListener listener = new NoopConfigurationChangeListener()
+ {
+ @Override
+ public void stateChanged(ConfiguredObject<?> object, State oldState, State newState)
+ {
+ if (newState == State.ERRORED)
+ {
+ stopLatch.countDown();
+ }
+ }
+ };
+ node1.addChangeListener(listener);
+
+ BDBHAVirtualHost<?> host = (BDBHAVirtualHost<?>)node1.getVirtualHost();
+ List<String> permittedNodes = new ArrayList<String>();
+ permittedNodes.add(helperAddress);
+ host.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.PERMITTED_NODES, permittedNodes));
+
+ assertTrue("Intruder protection was not triggered during expected timeout", stopLatch.await(20, TimeUnit.SECONDS));
+ }
+
private BDBHARemoteReplicationNode<?> findRemoteNode(BDBHAVirtualHostNode<?> node, String name)
{
for (RemoteReplicationNode<?> remoteNode : node.getRemoteReplicationNodes())
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index 86d47a6a8b..89e618482b 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -24,14 +24,19 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.test.utils.TestFileUtils;
@@ -41,6 +46,7 @@ import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.Durability;
import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.rep.NodeState;
import com.sleepycat.je.rep.ReplicatedEnvironment;
@@ -52,6 +58,7 @@ import com.sleepycat.je.rep.StateChangeListener;
public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
{
+ private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacadeTest.class);
private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort();
private static final int LISTENER_TIMEOUT = 5;
private static final int WAIT_STATE_CHANGE_TIMEOUT = 30;
@@ -644,6 +651,105 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
}
}
+ public void testSetPermittedNodes() throws Exception
+ {
+ ReplicatedEnvironmentFacade firstNode = createMaster();
+
+ Set<String> permittedNodes = new HashSet<String>();
+ permittedNodes.add("localhost:" + TEST_NODE_PORT);
+ permittedNodes.add("localhost:" + getNextAvailable(TEST_NODE_PORT + 1));
+ firstNode.setPermittedNodes(permittedNodes);
+
+ NodeState nodeState = firstNode.getRemoteNodeState(new ReplicatedEnvironmentFacade.ReplicationNodeImpl(TEST_NODE_NAME, TEST_NODE_HOST_PORT));
+
+ Collection<String> appStatePermittedNodes = firstNode.bytesToPermittedNodeList(nodeState.getAppState());
+ assertEquals("Unexpected permitted nodes", permittedNodes, new HashSet<String>(appStatePermittedNodes));
+ }
+
+ public void testPermittedNodeIsAllowedToConnect() throws Exception
+ {
+ ReplicatedEnvironmentFacade firstNode = createMaster();
+
+ int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+ String node1NodeHostPort = "localhost:" + replica1Port;
+
+ Set<String> permittedNodes = new HashSet<String>();
+ permittedNodes.add("localhost:" + TEST_NODE_PORT);
+ permittedNodes.add(node1NodeHostPort);
+ firstNode.setPermittedNodes(permittedNodes);
+
+ ReplicatedEnvironmentConfiguration configuration = createReplicatedEnvironmentConfiguration(TEST_NODE_NAME + "_1", node1NodeHostPort, false);
+ when(configuration.getHelperNodeName()).thenReturn(TEST_NODE_NAME);
+
+ TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.REPLICA);
+ ReplicatedEnvironmentFacade secondNode = createReplicatedEnvironmentFacade(TEST_NODE_NAME + "_1",
+ stateChangeListener, new NoopReplicationGroupListener(), configuration);
+ assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+ assertEquals("Unexpected state", State.REPLICA.name(), secondNode.getNodeState());
+ }
+
+ public void testNotPermittedNodeIsNotAllowedToConnect() throws Exception
+ {
+ ReplicatedEnvironmentFacade firstNode = createMaster();
+
+ int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+ String node1NodeHostPort = "localhost:" + replica1Port;
+
+ Set<String> permittedNodes = new HashSet<String>();
+ permittedNodes.add("localhost:" + TEST_NODE_PORT);
+
+ firstNode.setPermittedNodes(permittedNodes);
+
+ ReplicatedEnvironmentConfiguration configuration = createReplicatedEnvironmentConfiguration(TEST_NODE_NAME + "_1", node1NodeHostPort, false);
+ when(configuration.getHelperNodeName()).thenReturn(TEST_NODE_NAME);
+
+ try
+ {
+ createReplicatedEnvironmentFacade(TEST_NODE_NAME + "_1", new TestStateChangeListener(State.REPLICA), new NoopReplicationGroupListener(), configuration);
+ fail("Node is not allowed to connect from " + node1NodeHostPort + " but environment was successfully created");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ assertEquals("Unexpected exception message", String.format("Node from '%s' is not permitted!",
+ node1NodeHostPort), e.getMessage());
+ }
+ }
+
+ public void testIntruderNodeIsDetected() throws Exception
+ {
+ final CountDownLatch intruderLatch = new CountDownLatch(1);
+ ReplicationGroupListener listener = new NoopReplicationGroupListener()
+ {
+ @Override
+ public void onIntruderNode(ReplicationNode node)
+ {
+ intruderLatch.countDown();
+ }
+ };
+ ReplicatedEnvironmentFacade firstNode = createMaster(listener);
+ int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+ String node1NodeHostPort = "localhost:" + replica1Port;
+
+ Set<String> permittedNodes = new HashSet<String>();
+ permittedNodes.add("localhost:" + TEST_NODE_PORT);
+
+ firstNode.setPermittedNodes(permittedNodes);
+
+ String nodeName = TEST_NODE_NAME + "_1";
+ File environmentPathFile = new File(_storePath, nodeName);
+ environmentPathFile.mkdirs();
+
+ ReplicationConfig replicationConfig = new ReplicationConfig(TEST_GROUP_NAME, TEST_NODE_NAME + "_1", node1NodeHostPort);
+ replicationConfig.setHelperHosts(TEST_NODE_HOST_PORT);
+
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+ envConfig.setDurability(TEST_DURABILITY);
+ ReplicatedEnvironment intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
+ assertTrue("Intruder node was not detected", intruderLatch.await(10, TimeUnit.SECONDS));
+ }
+
private ReplicatedEnvironmentFacade createMaster() throws Exception
{
return createMaster(new NoopReplicationGroupListener());
@@ -678,6 +784,10 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener)
{
ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary);
+ return createReplicatedEnvironmentFacade(nodeName, stateChangeListener, replicationGroupListener, config);
+ }
+
+ private ReplicatedEnvironmentFacade createReplicatedEnvironmentFacade(String nodeName, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener, ReplicatedEnvironmentConfiguration config) {
ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config);
ref.setStateChangeListener(stateChangeListener);
ref.setReplicationGroupListener(replicationGroupListener);
@@ -735,5 +845,11 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
{
}
+ @Override
+ public void onIntruderNode(ReplicationNode node)
+ {
+ LOGGER.warn("Intruder node " + node);
+ }
+
}
}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
index ac6ca45877..1b904c94a4 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
@@ -31,11 +31,16 @@ import java.util.Map;
import javax.servlet.http.HttpServletResponse;
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
import org.apache.qpid.server.model.RemoteReplicationNode;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
import org.apache.qpid.systest.rest.Asserts;
@@ -136,7 +141,7 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase
Map<String, Object> remoteNode1 = findRemoteNodeByName(remoteNodes, NODE1);
assertEquals("Node 1 observed from node 2 is in the wrong state",
- "UNAVAILABLE", remoteNode1.get(BDBHARemoteReplicationNode.STATE));
+ "UNAVAILABLE", remoteNode1.get(BDBHARemoteReplicationNode.STATE));
assertEquals("Node 1 observed from node 2 has the wrong role",
"UNKNOWN", remoteNode1.get(BDBHARemoteReplicationNode.ROLE));
@@ -240,8 +245,57 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase
assertEquals("Unexpected number of remote nodes on " + NODE2, 1, data.size());
}
+ public void testIntruderProtection() throws Exception
+ {
+ createHANode(NODE1, _node1HaPort, _node1HaPort);
+ assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1);
+
+ String virtualHostRestUrl = "virtualhost/" + NODE1 + "/" + _hostName;
+
+ Map<String,Object> hostData = new HashMap<String,Object>();
+ hostData.put(BDBHAVirtualHost.PERMITTED_NODES, Arrays.asList( "localhost:" + _node1HaPort, "localhost:" + _node3HaPort));
+ getRestTestHelper().submitRequest(virtualHostRestUrl, "PUT", hostData, 200);
+
+ // add permitted node
+ Map<String, Object> node3Data = createNodeAttributeMap(NODE3, _node3HaPort, _node1HaPort);
+ node3Data.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, NODE1);
+ getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE3, "PUT", node3Data, 201);
+ assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1);
+ assertRemoteNodes(NODE1, NODE3);
+
+ // try to add not permitted node
+ Map<String, Object> nodeData = createNodeAttributeMap(NODE2, _node2HaPort, _node1HaPort);
+ nodeData.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, NODE1);
+ getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2, "PUT", nodeData, 409);
+
+ assertRemoteNodes(NODE1, NODE3);
+
+ //connect intruder node
+ String nodeName = NODE2;
+ String nodeHostPort = (String)nodeData.get(BDBHAVirtualHostNode.ADDRESS);
+ File environmentPathFile = new File((String)nodeData.get(BDBHAVirtualHostNode.STORE_PATH), nodeName);
+ environmentPathFile.mkdirs();
+ ReplicationConfig replicationConfig = new ReplicationConfig((String)nodeData.get(BDBHAVirtualHostNode.GROUP_NAME), nodeName, nodeHostPort);
+ replicationConfig.setHelperHosts((String)nodeData.get(BDBHAVirtualHostNode.HELPER_ADDRESS));
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+ envConfig.setDurability(Durability.parse((String)nodeData.get(BDBHAVirtualHostNode.DURABILITY)));
+ ReplicatedEnvironment intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
+
+ waitForAttributeChanged(_baseNodeRestUrl + NODE1, VirtualHost.STATE, State.ERRORED.name());
+ }
+
private void createHANode(String nodeName, int nodePort, int helperPort) throws Exception
{
+ Map<String, Object> nodeData = createNodeAttributeMap(nodeName, nodePort, helperPort);
+
+ int responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + nodeName, "PUT", nodeData);
+ assertEquals("Unexpected response code for virtual host node " + nodeName + " creation request", 201, responseCode);
+ }
+
+ private Map<String, Object> createNodeAttributeMap(String nodeName, int nodePort, int helperPort)
+ {
Map<String, Object> nodeData = new HashMap<String, Object>();
nodeData.put(BDBHAVirtualHostNode.NAME, nodeName);
nodeData.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
@@ -249,9 +303,7 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase
nodeData.put(BDBHAVirtualHostNode.GROUP_NAME, _hostName);
nodeData.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + nodePort);
nodeData.put(BDBHAVirtualHostNode.HELPER_ADDRESS, "localhost:" + helperPort);
-
- int responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + nodeName, "PUT", nodeData);
- assertEquals("Unexpected response code for virtual host node " + nodeName + " creation request", 201, responseCode);
+ return nodeData;
}
private void assertNode(String nodeName, int nodePort, int nodeHelperPort, String masterNode) throws Exception
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
index c1e2f59bd8..e0c03fe822 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.configuration.updater;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.Future;
public interface TaskExecutor
{
@@ -40,4 +41,6 @@ public interface TaskExecutor
<T> T run(Task<T> task) throws CancellationException;
+ <T> Future<T> submit(Task<T> task) throws CancellationException;
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
index b140fc4a61..96e4e256b2 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
@@ -117,7 +117,8 @@ public class TaskExecutorImpl implements TaskExecutor
}
}
- <T> Future<T> submit(Task<T> task)
+ @Override
+ public <T> Future<T> submit(Task<T> task)
{
checkState();
if (LOGGER.isDebugEnabled())
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
index 44bc923f0e..0cb69e4cd8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
@@ -88,7 +88,7 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
}
- @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED }, desiredState = State.ACTIVE )
+ @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
private void doActivate()
{
try
@@ -176,9 +176,14 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
@StateTransition( currentState = { State.ACTIVE, State.ERRORED, State.UNINITIALIZED }, desiredState = State.STOPPED )
protected void doStop()
{
+ stopAndSetStateTo(State.STOPPED);
+ }
+
+ protected void stopAndSetStateTo(State stoppedState)
+ {
closeChildren();
closeConfigurationStore();
- _state.set(State.STOPPED);
+ _state.set(stoppedState);
}
@Override
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
index 6e21e5325d..4343419505 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
@@ -21,6 +21,10 @@
package org.apache.qpid.server.configuration.updater;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
public class CurrentThreadTaskExecutor implements TaskExecutor
@@ -94,6 +98,45 @@ public class CurrentThreadTaskExecutor implements TaskExecutor
return task.execute();
}
+ @Override
+ public <T> Future<T> submit(Task<T> task) throws CancellationException
+ {
+ checkThread();
+ final T result = task.execute();
+ return new Future<T>()
+ {
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
+ return true;
+ }
+
+ @Override
+ public boolean isCancelled()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ return true;
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException
+ {
+ return result;
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+ {
+ return result;
+ }
+ };
+ }
+
public static TaskExecutor newStartedInstance()
{
TaskExecutor executor = new CurrentThreadTaskExecutor();
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
index 3eec242703..10f56cef58 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
@@ -463,7 +463,7 @@ public class JMXManagementPluginImpl
@Override
public void stateChanged(ConfiguredObject<?> object, State oldState, State newState)
{
- if (newState == State.DELETED || newState == State.STOPPED)
+ if (newState == State.DELETED || newState == State.STOPPED || newState == State.ERRORED)
{
destroyObjectMBeans(object, newState == State.DELETED);
}