diff options
| author | Alex Rudyy <orudyy@apache.org> | 2014-07-04 01:16:07 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2014-07-04 01:16:07 +0000 |
| commit | a2dfed6abeaad71e69d9d73c6db42b47d7d93c66 (patch) | |
| tree | 3d985f74b3e8c434bb65c6517093e944b2ba8b34 | |
| parent | 29481e51fdbd1a87c7ec75eac8e60ba93028e123 (diff) | |
| download | qpid-python-a2dfed6abeaad71e69d9d73c6db42b47d7d93c66.tar.gz | |
QPID-5867: Add intruder protection functionality for a cluster of BDB HA virtual host nodes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1607772 13f79535-47bb-0310-9956-ffa450edef68
19 files changed, 874 insertions, 28 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/HASettings.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/HASettings.java index 31e9987182..fd4a7bc1c7 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/HASettings.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/HASettings.java @@ -34,4 +34,6 @@ public interface HASettings extends FileBasedSettings int getPriority(); int getQuorumOverride(); + + String getHelperNodeName(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java index 90fb086dc5..ae0d4b13ae 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java @@ -34,4 +34,5 @@ public interface ReplicatedEnvironmentConfiguration extends StandardEnvironmentC int getPriority(); int getQuorumOverride(); Map<String, String> getReplicationParameters(); + String getHelperNodeName(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index dff5fc372d..8b9039c792 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.store.berkeleydb.replication; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; @@ -33,6 +34,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -47,11 +49,13 @@ import com.sleepycat.je.Sequence; import com.sleepycat.je.SequenceConfig; import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener; import org.apache.qpid.server.util.DaemonThreadFactory; +import org.codehaus.jackson.map.ObjectMapper; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; @@ -63,10 +67,13 @@ import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.EnvironmentFailureException; import com.sleepycat.je.Transaction; import com.sleepycat.je.TransactionConfig; +import com.sleepycat.je.rep.AppStateMonitor; import com.sleepycat.je.rep.InsufficientLogException; +import com.sleepycat.je.rep.InsufficientAcksException; import com.sleepycat.je.rep.InsufficientReplicasException; import com.sleepycat.je.rep.NetworkRestore; import com.sleepycat.je.rep.NetworkRestoreConfig; +import com.sleepycat.je.rep.NodeType; import com.sleepycat.je.rep.NodeState; import com.sleepycat.je.rep.RepInternal; import com.sleepycat.je.rep.ReplicatedEnvironment; @@ -143,6 +150,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan }}); public static final String TYPE = "BDB-HA"; + private static final String PERMITTED_NODE_LIST = "permittedNodes"; private final ReplicatedEnvironmentConfiguration _configuration; private final String _prettyGroupNodeName; @@ -165,6 +173,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private final ConcurrentHashMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>(); private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>(); + private final Set<String> _permittedNodes = new CopyOnWriteArraySet<String>(); public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration) { @@ -190,7 +199,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan _environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName)); _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName)); - // create environment in a separate thread to avoid renaming of the current thread by JE _environment = createEnvironment(true); populateExistingRemoteReplicationNodes(); @@ -298,7 +306,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan @Override public DatabaseException handleDatabaseException(String contextMessage, final DatabaseException dbe) { - boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException || dbe instanceof RestartRequiredException); + boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientAcksException || dbe instanceof RestartRequiredException); if (restart) { tryToRestartEnvironment(dbe); @@ -821,6 +829,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan _environment = createEnvironment(false); + registerAppStateMonitorIfPermittedNodesSpecified(); + if (_stateChangeListener.get() != null) { _environment.setStateChangeListener(this); @@ -935,25 +945,34 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan boolean designatedPrimary = _configuration.isDesignatedPrimary(); int priority = _configuration.getPriority(); int quorumOverride = _configuration.getQuorumOverride(); + String nodeName = _configuration.getName(); + String helperNodeName = _configuration.getHelperNodeName(); if (LOGGER.isInfoEnabled()) { LOGGER.info("Creating environment"); LOGGER.info("Environment path " + _environmentDirectory.getAbsolutePath()); LOGGER.info("Group name " + groupName); - LOGGER.info("Node name " + _configuration.getName()); + LOGGER.info("Node name " + nodeName); LOGGER.info("Node host port " + hostPort); LOGGER.info("Helper host port " + helperHostPort); + LOGGER.info("Helper host name " + helperNodeName); LOGGER.info("Durability " + _defaultDurability); LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary); LOGGER.info("Node priority " + priority); LOGGER.info("Quorum override " + quorumOverride); + LOGGER.info("Permitted node list " + _permittedNodes); + } + + if (helperNodeName != null && _permittedNodes.isEmpty() && !helperHostPort.equals(hostPort)) + { + connectToHelperNodeAndCheckPermittedHosts(helperNodeName, helperHostPort, hostPort); } Map<String, String> replicationEnvironmentParameters = new HashMap<>(ReplicatedEnvironmentFacade.REPCONFIG_DEFAULTS); replicationEnvironmentParameters.putAll(_configuration.getReplicationParameters()); - ReplicationConfig replicationConfig = new ReplicationConfig(groupName, _configuration.getName(), hostPort); + ReplicationConfig replicationConfig = new ReplicationConfig(groupName, nodeName, hostPort); replicationConfig.setHelperHosts(helperHostPort); replicationConfig.setDesignatedPrimary(designatedPrimary); replicationConfig.setNodePriority(priority); @@ -1111,6 +1130,144 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } + public void setPermittedNodes(Collection<String> permittedNodes) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Setting permitted nodes to " + permittedNodes); + } + + _permittedNodes.clear(); + if (permittedNodes != null) + { + _permittedNodes.addAll(permittedNodes); + registerAppStateMonitorIfPermittedNodesSpecified(); + + ReplicationGroupListener listener = _replicationGroupListener.get(); + for(ReplicationNode node: _remoteReplicationNodes.values()) + { + if (!isNodePermitted(node)) + { + onIntruder(listener, node); + } + } + } + } + + private void registerAppStateMonitorIfPermittedNodesSpecified() + { + if (!_permittedNodes.isEmpty()) + { + byte[] data = permittedNodeListToBytes(_permittedNodes); + _environment.registerAppStateMonitor(new EnvironmentStateHolder(data)); + } + } + + private boolean isNodePermitted(ReplicationNode replicationNode) + { + if (_permittedNodes.isEmpty()) + { + return true; + } + + String nodeHostPort = getHostPort(replicationNode); + return _permittedNodes.contains(nodeHostPort); + } + + private String getHostPort(ReplicationNode replicationNode) + { + return replicationNode.getHostName() + ":" + replicationNode.getPort(); + } + + + private void onIntruder(ReplicationGroupListener replicationGroupListener, ReplicationNode replicationNode) + { + if (replicationGroupListener != null) + { + replicationGroupListener.onIntruderNode(replicationNode); + } + else + { + LOGGER.warn(String.format("Found an intruder node '%s' from ''%s' . The node is not listed in permitted list: %s", + replicationNode.getName(), getHostPort(replicationNode), String.valueOf(_permittedNodes))); + } + } + + private byte[] permittedNodeListToBytes(Set<String> permittedNodeList) + { + HashMap<String, Object> data = new HashMap<String, Object>(); + data.put(PERMITTED_NODE_LIST, permittedNodeList); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectMapper objectMapper = new ObjectMapper(); + try + { + objectMapper.writeValue(baos, data); + } + catch (Exception e) + { + throw new RuntimeException("Unexpected exception on serializing of permitted node list into json", e); + } + return baos.toByteArray(); + } + + Collection<String> bytesToPermittedNodeList(byte[] applicationState) + { + if (applicationState == null || applicationState.length == 0) + { + return Collections.emptySet(); + } + + ObjectMapper objectMapper = new ObjectMapper(); + try + { + Map<String, Object> settings = objectMapper.readValue(applicationState, Map.class); + return (Collection<String>)settings.get(PERMITTED_NODE_LIST); + } + catch (Exception e) + { + throw new RuntimeException("Unexpected exception on de-serializing of application state", e); + } + } + + private void connectToHelperNodeAndCheckPermittedHosts(String helperNodeName, String helperHostPort, String hostPort) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug(String.format("Requesting state of the node '%s' at '%s'", helperNodeName, helperHostPort)); + } + + Collection<String> permittedNodes = null; + try + { + NodeState state = getRemoteNodeState(new ReplicationNodeImpl(helperNodeName, helperHostPort)); + byte[] applicationState = state.getAppState(); + permittedNodes = bytesToPermittedNodeList(applicationState); + } + catch (IOException e) + { + throw new IllegalConfigurationException(String.format("Cannot connect to '%s'", helperHostPort), e); + } + catch (ServiceConnectFailedException e) + { + throw new IllegalConfigurationException(String.format("Failure to connect to '%s'", helperHostPort), e); + } + catch (Exception e) + { + throw new RuntimeException(String.format("Unexpected exception on attempt to retrieve state from '%s' at '%s'", + helperNodeName, helperHostPort), e); + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug(String.format("Attribute 'permittedNodes' on node '%s' is set to '%s'", helperNodeName, String.valueOf(permittedNodes))); + } + + if (permittedNodes != null && !permittedNodes.isEmpty() && !permittedNodes.contains(hostPort)) + { + throw new IllegalConfigurationException(String.format("Node from '%s' is not permitted!", hostPort)); + } + } + private void populateExistingRemoteReplicationNodes() { ReplicationGroup group = _environment.getGroup(); @@ -1157,7 +1314,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan Map<ReplicationNode, NodeState> nodeStates = discoverNodeStates(_remoteReplicationNodes.values()); - executeDabasePingerOnNodeChangesIfMaster(nodeStates); + executeDatabasePingerOnNodeChangesIfMaster(nodeStates); notifyGroupListenerAboutNodeStates(nodeStates); } @@ -1187,7 +1344,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan if (env != null) { ReplicationGroup group = env.getGroup(); - Set<ReplicationNode> nodes = new HashSet<ReplicationNode>(group.getElectableNodes()); + Set<ReplicationNode> nodes = new HashSet<ReplicationNode>(group.getNodes()); String localNodeName = getNodeName(); Map<String, ReplicationNode> removalMap = new HashMap<String, ReplicationNode>(_remoteReplicationNodes); @@ -1205,9 +1362,16 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan _remoteReplicationNodes.put(discoveredNodeName, replicationNode); - if (replicationGroupListener != null) + if (isNodePermitted(replicationNode)) { - replicationGroupListener.onReplicationNodeAddedToGroup(replicationNode); + if (replicationGroupListener != null) + { + replicationGroupListener.onReplicationNodeAddedToGroup(replicationNode); + } + } + else + { + onIntruder(replicationGroupListener, replicationNode); } } else @@ -1288,7 +1452,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return nodeStates; } - private void executeDabasePingerOnNodeChangesIfMaster(final Map<ReplicationNode, NodeState> nodeStates) + private void executeDatabasePingerOnNodeChangesIfMaster(final Map<ReplicationNode, NodeState> nodeStates) { if (ReplicatedEnvironment.State.MASTER == _environment.getState()) { @@ -1330,4 +1494,82 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan CLOSING, CLOSED } + + private static class EnvironmentStateHolder implements AppStateMonitor + { + private byte[] _data; + + private EnvironmentStateHolder(byte[] data) + { + this._data = data; + } + + @Override + public byte[] getAppState() + { + return _data; + } + } + + static class ReplicationNodeImpl implements ReplicationNode + { + + private final InetSocketAddress _address; + private final String _nodeName; + private final String _host; + private final int _port; + + public ReplicationNodeImpl(String nodeName, String hostPort) + { + String[] tokens = hostPort.split(":"); + if (tokens.length != 2) + { + throw new IllegalArgumentException("Unexpected host port value :" + hostPort); + } + _host = tokens[0]; + _port = Integer.parseInt(tokens[1]); + _nodeName = nodeName; + _address = new InetSocketAddress(_host, _port); + } + + @Override + public String getName() + { + return _nodeName; + } + + @Override + public NodeType getType() + { + return NodeType.ELECTABLE; + } + + @Override + public InetSocketAddress getSocketAddress() + { + return _address; + } + + @Override + public String getHostName() + { + return _host; + } + + @Override + public int getPort() + { + return _port; + } + + @Override + public String toString() + { + return "ReplicationNodeImpl{" + + "_nodeName='" + _nodeName + '\'' + + ", _host='" + _host + '\'' + + ", _port=" + _port + + '}'; + } + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java index d7b01bc829..f67d232b9f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -66,6 +66,12 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact } @Override + public String getHelperNodeName() + { + return settings.getHelperNodeName(); + } + + @Override public int getQuorumOverride() { return settings.getQuorumOverride(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java index e88b9e8651..7f9f0fcf64 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java @@ -48,4 +48,9 @@ public interface ReplicationGroupListener */ void onNodeState(ReplicationNode node, NodeState nodeState); + /** + * Invoked on intruder node detected + */ + void onIntruderNode(ReplicationNode node); + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java index f231638426..477ffb5a64 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.virtualhost.berkeleydb; +import java.util.List; + import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.DerivedAttribute; import org.apache.qpid.server.model.ManagedAttribute; @@ -34,6 +36,7 @@ public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends Virtual String COALESCING_SYNC = "coalescingSync"; String DURABILITY = "durability"; String STORE_PATH = "storePath"; + String PERMITTED_NODES = "permittedNodes"; @ManagedAttribute( defaultValue = "SYNC") String getLocalTransactionSynchronizationPolicy(); @@ -52,4 +55,7 @@ public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends Virtual @ManagedAttribute(mandatory = true, defaultValue = "0") Long getStoreOverfullSize(); + + @ManagedAttribute + List<String> getPermittedNodes(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java index ba210b470c..aaa5f6aca4 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.virtualhost.berkeleydb; +import java.util.List; import java.util.Map; import java.util.Set; @@ -54,6 +55,9 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm @ManagedAttributeField private Long _storeOverfullSize; + @ManagedAttributeField(afterSet = "applyPermittedNodes") + private List<String> _permittedNodes; + @ManagedObjectFactoryConstructor public BDBHAVirtualHostImpl(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode) { @@ -127,6 +131,31 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getRemoteTransactionSynchronizationPolicy(); validateTransactionSynchronizationPolicy(policy); } + + if(changedAttributes.contains(PERMITTED_NODES)) + { + + List<String> permittedNodes = ((BDBHAVirtualHost<?>)proxyForValidation).getPermittedNodes(); + if (permittedNodes != null) + { + for (String permittedNode: permittedNodes) + { + String[] tokens = permittedNode.split(":"); + if (tokens.length != 2) + { + throw new IllegalArgumentException(String.format("Invalid permitted node specified '%s'. ", permittedNode)); + } + try + { + Integer.parseInt(tokens[1]); + } + catch(Exception e) + { + throw new IllegalArgumentException(String.format("Invalid port is specified in permitted node '%s'. ", permittedNode)); + } + } + } + } } private void validateTransactionSynchronizationPolicy(String policy) @@ -158,4 +187,18 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm return _storeOverfullSize; } + @Override + public List<String> getPermittedNodes() + { + return _permittedNodes; + } + + protected void applyPermittedNodes() + { + ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade(); + if (facade != null) + { + facade.setPermittedNodes(getPermittedNodes()); + } + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java index 691096f59f..10b2b13bc9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java @@ -33,6 +33,7 @@ public interface BDBHARemoteReplicationNode<X extends BDBHARemoteReplicationNode String ROLE = "role"; String LAST_KNOWN_REPLICATION_TRANSACTION_ID = "lastKnownReplicationTransactionId"; String JOIN_TIME = "joinTime"; + String MONITOR = "monitor"; @DerivedAttribute String getGroupName(); @@ -49,4 +50,6 @@ public interface BDBHARemoteReplicationNode<X extends BDBHARemoteReplicationNode @DerivedAttribute long getLastKnownReplicationTransactionId(); + @DerivedAttribute + boolean isMonitor(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java index 4b5683b794..5327997498 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java @@ -54,6 +54,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB private volatile String _role; private final AtomicReference<State> _state; + private final boolean _isMonitor; public BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNode<?> virtualHostNode, Map<String, Object> attributes, ReplicatedEnvironmentFacade replicatedEnvironmentFacade) { @@ -61,6 +62,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB _address = (String)attributes.get(ADDRESS); _replicatedEnvironmentFacade = replicatedEnvironmentFacade; _state = new AtomicReference<State>(State.ACTIVE); + _isMonitor = (Boolean)attributes.get(MONITOR); } @Override @@ -100,6 +102,12 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB } @Override + public boolean isMonitor() + { + return _isMonitor; + } + + @Override public void deleted() { super.deleted(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java index 5e738e7701..ad12dc2447 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.server.virtualhostnode.berkeleydb; -import java.util.Map; - import org.apache.qpid.server.model.DerivedAttribute; import org.apache.qpid.server.model.ManagedAttribute; import org.apache.qpid.server.store.berkeleydb.HASettings; @@ -38,6 +36,7 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends public static final String ROLE = "role"; public static final String LAST_KNOWN_REPLICATION_TRANSACTION_ID = "lastKnownReplicationTransactionId"; public static final String JOIN_TIME = "joinTime"; + public static final String HELPER_NODE_NAME = "helperNodeName"; @ManagedAttribute(mandatory=true) String getGroupName(); @@ -65,4 +64,7 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends @DerivedAttribute Long getJoinTime(); + + @ManagedAttribute(persist = false) + String getHelperNodeName(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index cf1cea3efa..3868025096 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -36,6 +36,7 @@ import javax.security.auth.Subject; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.rep.NodeState; +import com.sleepycat.je.rep.NodeType; import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.ReplicationNode; import com.sleepycat.je.rep.StateChangeEvent; @@ -44,6 +45,8 @@ import com.sleepycat.je.rep.util.ReplicationGroupAdmin; import com.sleepycat.je.rep.utilint.HostPortPair; import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.updater.Task; +import org.apache.qpid.server.configuration.updater.VoidTask; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; @@ -107,6 +110,9 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @ManagedAttributeField(afterSet="postSetRole") private String _role; + @ManagedAttributeField + private String _helperNodeName; + @ManagedObjectFactoryConstructor public BDBHAVirtualHostNodeImpl(Map<String, Object> attributes, Broker<?> broker) { @@ -202,6 +208,12 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu return -1L; } + @Override + public String getHelperNodeName() + { + return _helperNodeName; + } + @SuppressWarnings("rawtypes") @Override public Collection<? extends RemoteReplicationNode> getRemoteReplicationNodes() @@ -260,6 +272,11 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.STORE_LOCATION(getStorePath())); ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) getConfigurationStore().getEnvironmentFacade(); + if (environmentFacade == null) + { + throw new IllegalStateException("Environment facade is not created"); + } + if (_environmentFacade.compareAndSet(null, environmentFacade)) { environmentFacade.setStateChangeListener(new BDBHAMessageStoreStateChangeListener()); @@ -276,11 +293,16 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } finally { - ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade(); - if (_environmentFacade.compareAndSet(environmentFacade, null)) - { - environmentFacade.close(); - } + stopEnvironment(); + } + } + + private void stopEnvironment() + { + ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade(); + if (_environmentFacade.compareAndSet(environmentFacade, null)) + { + environmentFacade.close(); } } @@ -598,7 +620,20 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu private class RemoteNodesDiscoverer implements ReplicationGroupListener { @Override - public void onReplicationNodeAddedToGroup(ReplicationNode node) + public void onReplicationNodeAddedToGroup(final ReplicationNode node) + { + getTaskExecutor().submit(new Task<Void>() + { + @Override + public Void execute() + { + addRemoteReplicationNode(node); + return null; + } + }); + } + + private void addRemoteReplicationNode(ReplicationNode node) { BDBHARemoteReplicationNodeImpl remoteNode = new BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNodeImpl.this, nodeToAttributes(node), getReplicatedEnvironmentFacade()); remoteNode.create(); @@ -606,14 +641,40 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } @Override - public void onReplicationNodeRecovered(ReplicationNode node) + public void onReplicationNodeRecovered(final ReplicationNode node) + { + getTaskExecutor().submit(new Task<Void>() + { + @Override + public Void execute() + { + recoverRemoteReplicationNode(node); + return null; + } + }); + } + + private void recoverRemoteReplicationNode(ReplicationNode node) { BDBHARemoteReplicationNodeImpl remoteNode = new BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNodeImpl.this, nodeToAttributes(node), getReplicatedEnvironmentFacade()); remoteNode.open(); } @Override - public void onReplicationNodeRemovedFromGroup(ReplicationNode node) + public void onReplicationNodeRemovedFromGroup(final ReplicationNode node) + { + getTaskExecutor().submit(new Task<Void>() + { + @Override + public Void execute() + { + removeRemoteReplicationNode(node); + return null; + } + }); + } + + private void removeRemoteReplicationNode(ReplicationNode node) { BDBHARemoteReplicationNodeImpl remoteNode = getChildByName(BDBHARemoteReplicationNodeImpl.class, node.getName()); if (remoteNode != null) @@ -634,11 +695,55 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } else { - remoteNode.setRole(nodeState.getNodeState().name()); remoteNode.setJoinTime(nodeState.getJoinTime()); remoteNode.setLastTransactionId(nodeState.getCurrentTxnEndVLSN()); + remoteNode.setRole(nodeState.getNodeState().name()); + } + } + } + + @Override + public void onIntruderNode(ReplicationNode node) + { + boolean inManagementMode = getParent(Broker.class).isManagementMode(); + if (inManagementMode) + { + LOGGER.warn(String.format("Intruder replication node '%s' from '%s' is detected. Ignoring intruder in management mode", + node.getName(), node.getHostName() + ":" + node.getPort() )); + BDBHARemoteReplicationNodeImpl remoteNode = getChildByName(BDBHARemoteReplicationNodeImpl.class, node.getName()); + if (remoteNode == null) + { + addRemoteReplicationNode(node); } } + else + { + LOGGER.error(String.format("Intruder node '%s' from '%s' is detected. Stopping down virtual host node '%s'", + node.getName(), node.getHostName() + ":" + node.getPort(), BDBHAVirtualHostNodeImpl.this.toString() )); + + getTaskExecutor().submit(new Task<Void>() + { + @Override + public Void execute() + { + State state = getState(); + + if (state == State.ACTIVE) + { + try + { + stopAndSetStateTo(State.ERRORED); + } + finally + { + stopEnvironment(); + } + notifyStateChanged(state, State.ERRORED); + } + return null; + } + }); + } } private Map<String, Object> nodeToAttributes(ReplicationNode replicationNode) @@ -647,6 +752,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu attributes.put(ConfiguredObject.NAME, replicationNode.getName()); attributes.put(ConfiguredObject.DURABLE, false); attributes.put(BDBHARemoteReplicationNode.ADDRESS, replicationNode.getHostName() + ":" + replicationNode.getPort()); + attributes.put(BDBHARemoteReplicationNode.MONITOR, replicationNode.getType() == NodeType.MONITOR); return attributes; } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java index 80f6e7ea49..d830d488db 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java @@ -23,11 +23,13 @@ package org.apache.qpid.server.store.berkeleydb; import static org.mockito.Mockito.when; import java.io.File; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -38,8 +40,13 @@ import java.util.concurrent.atomic.AtomicReference; import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.ReplicationConfig; +import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutorImpl; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.MessageLogger; +import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfigurationChangeListener; @@ -505,6 +512,201 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase } + public void testIntruderProtection() throws Exception + { + int node1PortNumber = findFreePort(); + String helperAddress = "localhost:" + node1PortNumber; + String groupName = "group"; + + Map<String, Object> node1Attributes = new HashMap<String, Object>(); + node1Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID()); + node1Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + node1Attributes.put(BDBHAVirtualHostNode.NAME, "node1"); + node1Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); + node1Attributes.put(BDBHAVirtualHostNode.ADDRESS, helperAddress); + node1Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); + node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1"); + + BDBHAVirtualHostNode<?> node1 = createAndStartHaVHN(node1Attributes); + BDBHAVirtualHost<?> host = (BDBHAVirtualHost<?>)node1.getVirtualHost(); + + List<String> permittedNodes = new ArrayList<String>(); + int node2PortNumber = getNextAvailable(node1PortNumber+1); + permittedNodes.add(helperAddress); + permittedNodes.add("localhost:" + node2PortNumber); + host.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.PERMITTED_NODES, permittedNodes)); + + Map<String, Object> node2Attributes = new HashMap<String, Object>(); + node2Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID()); + node2Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + node2Attributes.put(BDBHAVirtualHostNode.NAME, "node2"); + node2Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); + node2Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node2PortNumber); + node2Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); + node2Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "2"); + node2Attributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, "node1"); + node2Attributes.put(BDBHAVirtualHostNode.PRIORITY, 0); + + BDBHAVirtualHostNode<?> node2 = createAndStartHaVHN(node2Attributes); + + int node3PortNumber = getNextAvailable(node2PortNumber+1); + Map<String, Object> node3Attributes = new HashMap<String, Object>(); + node3Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID()); + node3Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + node3Attributes.put(BDBHAVirtualHostNode.NAME, "node3"); + node3Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); + node3Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node3PortNumber); + node3Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); + node3Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "3"); + node3Attributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, "node1"); + node3Attributes.put(BDBHAVirtualHostNode.PRIORITY, 0); + + try + { + createHaVHN(node3Attributes); + fail("The VHN should not be permitted to join the group"); + } + catch(IllegalConfigurationException e) + { + assertEquals("Unexpected exception message", String.format("Node from '%s' is not permitted!", "localhost:" + node3PortNumber), e.getMessage()); + } + + // join node by skipping a step retrieving node state and checking the permitted hosts + node3Attributes.remove(BDBHAVirtualHostNode.HELPER_NODE_NAME); + + final CountDownLatch stopLatch = new CountDownLatch(1); + ConfigurationChangeListener listener = new NoopConfigurationChangeListener() + { + @Override + public void stateChanged(ConfiguredObject<?> object, State oldState, State newState) + { + if (newState == State.ERRORED) + { + stopLatch.countDown(); + } + } + }; + node1.addChangeListener(listener); + + createHaVHN(node3Attributes); + + assertTrue("Intruder protection was not triggered during expected timeout", stopLatch.await(10, TimeUnit.SECONDS)); + } + + public void testIntruderProtectionInManagementMode() throws Exception + { + int node1PortNumber = findFreePort(); + String helperAddress = "localhost:" + node1PortNumber; + String groupName = "group"; + + Map<String, Object> node1Attributes = new HashMap<String, Object>(); + node1Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID()); + node1Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + node1Attributes.put(BDBHAVirtualHostNode.NAME, "node1"); + node1Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); + node1Attributes.put(BDBHAVirtualHostNode.ADDRESS, helperAddress); + node1Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); + node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1"); + + BDBHAVirtualHostNode<?> node1 = createAndStartHaVHN(node1Attributes); + + int node2PortNumber = getNextAvailable(node1PortNumber+1); + Map<String, Object> node2Attributes = new HashMap<String, Object>(); + node2Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID()); + node2Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + node2Attributes.put(BDBHAVirtualHostNode.NAME, "node2"); + node2Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); + node2Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node2PortNumber); + node2Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); + node2Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "2"); + node2Attributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, "node1"); + node2Attributes.put(BDBHAVirtualHostNode.PRIORITY, 0); + + BDBHAVirtualHostNode<?> node2 = createAndStartHaVHN(node2Attributes); + + final CountDownLatch stopLatch = new CountDownLatch(1); + ConfigurationChangeListener listener = new NoopConfigurationChangeListener() + { + @Override + public void stateChanged(ConfiguredObject<?> object, State oldState, State newState) + { + if (newState == State.ERRORED) + { + stopLatch.countDown(); + } + } + }; + node1.addChangeListener(listener); + + BDBHAVirtualHost<?> host = (BDBHAVirtualHost<?>)node1.getVirtualHost(); + + List<String> permittedNodes = new ArrayList<String>(); + permittedNodes.add(helperAddress); + host.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.PERMITTED_NODES, permittedNodes)); + + assertTrue("Intruder protection was not triggered during expected timeout", stopLatch.await(10, TimeUnit.SECONDS)); + + when(_broker.isManagementMode()).thenReturn(true); + node1.start(); + + awaitRemoteNodes(node1, 1); + + BDBHARemoteReplicationNode<?> remote = findRemoteNode(node1, node2.getName()); + remote.delete(); + } + + public void testIntruderConnectedBeforePermittedNodesAreSet() throws Exception + { + int node1PortNumber = findFreePort(); + String helperAddress = "localhost:" + node1PortNumber; + String groupName = "group"; + + Map<String, Object> node1Attributes = new HashMap<String, Object>(); + node1Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID()); + node1Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + node1Attributes.put(BDBHAVirtualHostNode.NAME, "node1"); + node1Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); + node1Attributes.put(BDBHAVirtualHostNode.ADDRESS, helperAddress); + node1Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); + node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1"); + + BDBHAVirtualHostNode<?> node1 = createAndStartHaVHN(node1Attributes); + + int node2PortNumber = getNextAvailable(node1PortNumber+1); + Map<String, Object> node2Attributes = new HashMap<String, Object>(); + node2Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID()); + node2Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + node2Attributes.put(BDBHAVirtualHostNode.NAME, "node2"); + node2Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); + node2Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node2PortNumber); + node2Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); + node2Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "2"); + node2Attributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, "node1"); + + createAndStartHaVHN(node2Attributes); + + final CountDownLatch stopLatch = new CountDownLatch(1); + ConfigurationChangeListener listener = new NoopConfigurationChangeListener() + { + @Override + public void stateChanged(ConfiguredObject<?> object, State oldState, State newState) + { + if (newState == State.ERRORED) + { + stopLatch.countDown(); + } + } + }; + node1.addChangeListener(listener); + + BDBHAVirtualHost<?> host = (BDBHAVirtualHost<?>)node1.getVirtualHost(); + List<String> permittedNodes = new ArrayList<String>(); + permittedNodes.add(helperAddress); + host.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.PERMITTED_NODES, permittedNodes)); + + assertTrue("Intruder protection was not triggered during expected timeout", stopLatch.await(20, TimeUnit.SECONDS)); + } + private BDBHARemoteReplicationNode<?> findRemoteNode(BDBHAVirtualHostNode<?> node, String name) { for (RemoteReplicationNode<?> remoteNode : node.getRemoteReplicationNodes()) diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index 86d47a6a8b..89e618482b 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -24,14 +24,19 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.File; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.test.utils.TestFileUtils; @@ -41,6 +46,7 @@ import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.Durability; import com.sleepycat.je.Environment; +import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.Transaction; import com.sleepycat.je.rep.NodeState; import com.sleepycat.je.rep.ReplicatedEnvironment; @@ -52,6 +58,7 @@ import com.sleepycat.je.rep.StateChangeListener; public class ReplicatedEnvironmentFacadeTest extends QpidTestCase { + private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacadeTest.class); private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort(); private static final int LISTENER_TIMEOUT = 5; private static final int WAIT_STATE_CHANGE_TIMEOUT = 30; @@ -644,6 +651,105 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase } } + public void testSetPermittedNodes() throws Exception + { + ReplicatedEnvironmentFacade firstNode = createMaster(); + + Set<String> permittedNodes = new HashSet<String>(); + permittedNodes.add("localhost:" + TEST_NODE_PORT); + permittedNodes.add("localhost:" + getNextAvailable(TEST_NODE_PORT + 1)); + firstNode.setPermittedNodes(permittedNodes); + + NodeState nodeState = firstNode.getRemoteNodeState(new ReplicatedEnvironmentFacade.ReplicationNodeImpl(TEST_NODE_NAME, TEST_NODE_HOST_PORT)); + + Collection<String> appStatePermittedNodes = firstNode.bytesToPermittedNodeList(nodeState.getAppState()); + assertEquals("Unexpected permitted nodes", permittedNodes, new HashSet<String>(appStatePermittedNodes)); + } + + public void testPermittedNodeIsAllowedToConnect() throws Exception + { + ReplicatedEnvironmentFacade firstNode = createMaster(); + + int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); + String node1NodeHostPort = "localhost:" + replica1Port; + + Set<String> permittedNodes = new HashSet<String>(); + permittedNodes.add("localhost:" + TEST_NODE_PORT); + permittedNodes.add(node1NodeHostPort); + firstNode.setPermittedNodes(permittedNodes); + + ReplicatedEnvironmentConfiguration configuration = createReplicatedEnvironmentConfiguration(TEST_NODE_NAME + "_1", node1NodeHostPort, false); + when(configuration.getHelperNodeName()).thenReturn(TEST_NODE_NAME); + + TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.REPLICA); + ReplicatedEnvironmentFacade secondNode = createReplicatedEnvironmentFacade(TEST_NODE_NAME + "_1", + stateChangeListener, new NoopReplicationGroupListener(), configuration); + assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + assertEquals("Unexpected state", State.REPLICA.name(), secondNode.getNodeState()); + } + + public void testNotPermittedNodeIsNotAllowedToConnect() throws Exception + { + ReplicatedEnvironmentFacade firstNode = createMaster(); + + int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); + String node1NodeHostPort = "localhost:" + replica1Port; + + Set<String> permittedNodes = new HashSet<String>(); + permittedNodes.add("localhost:" + TEST_NODE_PORT); + + firstNode.setPermittedNodes(permittedNodes); + + ReplicatedEnvironmentConfiguration configuration = createReplicatedEnvironmentConfiguration(TEST_NODE_NAME + "_1", node1NodeHostPort, false); + when(configuration.getHelperNodeName()).thenReturn(TEST_NODE_NAME); + + try + { + createReplicatedEnvironmentFacade(TEST_NODE_NAME + "_1", new TestStateChangeListener(State.REPLICA), new NoopReplicationGroupListener(), configuration); + fail("Node is not allowed to connect from " + node1NodeHostPort + " but environment was successfully created"); + } + catch (IllegalConfigurationException e) + { + assertEquals("Unexpected exception message", String.format("Node from '%s' is not permitted!", + node1NodeHostPort), e.getMessage()); + } + } + + public void testIntruderNodeIsDetected() throws Exception + { + final CountDownLatch intruderLatch = new CountDownLatch(1); + ReplicationGroupListener listener = new NoopReplicationGroupListener() + { + @Override + public void onIntruderNode(ReplicationNode node) + { + intruderLatch.countDown(); + } + }; + ReplicatedEnvironmentFacade firstNode = createMaster(listener); + int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); + String node1NodeHostPort = "localhost:" + replica1Port; + + Set<String> permittedNodes = new HashSet<String>(); + permittedNodes.add("localhost:" + TEST_NODE_PORT); + + firstNode.setPermittedNodes(permittedNodes); + + String nodeName = TEST_NODE_NAME + "_1"; + File environmentPathFile = new File(_storePath, nodeName); + environmentPathFile.mkdirs(); + + ReplicationConfig replicationConfig = new ReplicationConfig(TEST_GROUP_NAME, TEST_NODE_NAME + "_1", node1NodeHostPort); + replicationConfig.setHelperHosts(TEST_NODE_HOST_PORT); + + EnvironmentConfig envConfig = new EnvironmentConfig(); + envConfig.setAllowCreate(true); + envConfig.setTransactional(true); + envConfig.setDurability(TEST_DURABILITY); + ReplicatedEnvironment intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); + assertTrue("Intruder node was not detected", intruderLatch.await(10, TimeUnit.SECONDS)); + } + private ReplicatedEnvironmentFacade createMaster() throws Exception { return createMaster(new NoopReplicationGroupListener()); @@ -678,6 +784,10 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener) { ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary); + return createReplicatedEnvironmentFacade(nodeName, stateChangeListener, replicationGroupListener, config); + } + + private ReplicatedEnvironmentFacade createReplicatedEnvironmentFacade(String nodeName, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener, ReplicatedEnvironmentConfiguration config) { ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config); ref.setStateChangeListener(stateChangeListener); ref.setReplicationGroupListener(replicationGroupListener); @@ -735,5 +845,11 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase { } + @Override + public void onIntruderNode(ReplicationNode node) + { + LOGGER.warn("Intruder node " + node); + } + } } diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java index ac6ca45877..1b904c94a4 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java @@ -31,11 +31,16 @@ import java.util.Map; import javax.servlet.http.HttpServletResponse; +import com.sleepycat.je.Durability; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationConfig; import org.apache.qpid.server.model.RemoteReplicationNode; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; import org.apache.qpid.systest.rest.Asserts; @@ -136,7 +141,7 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase Map<String, Object> remoteNode1 = findRemoteNodeByName(remoteNodes, NODE1); assertEquals("Node 1 observed from node 2 is in the wrong state", - "UNAVAILABLE", remoteNode1.get(BDBHARemoteReplicationNode.STATE)); + "UNAVAILABLE", remoteNode1.get(BDBHARemoteReplicationNode.STATE)); assertEquals("Node 1 observed from node 2 has the wrong role", "UNKNOWN", remoteNode1.get(BDBHARemoteReplicationNode.ROLE)); @@ -240,8 +245,57 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase assertEquals("Unexpected number of remote nodes on " + NODE2, 1, data.size()); } + public void testIntruderProtection() throws Exception + { + createHANode(NODE1, _node1HaPort, _node1HaPort); + assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1); + + String virtualHostRestUrl = "virtualhost/" + NODE1 + "/" + _hostName; + + Map<String,Object> hostData = new HashMap<String,Object>(); + hostData.put(BDBHAVirtualHost.PERMITTED_NODES, Arrays.asList( "localhost:" + _node1HaPort, "localhost:" + _node3HaPort)); + getRestTestHelper().submitRequest(virtualHostRestUrl, "PUT", hostData, 200); + + // add permitted node + Map<String, Object> node3Data = createNodeAttributeMap(NODE3, _node3HaPort, _node1HaPort); + node3Data.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, NODE1); + getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE3, "PUT", node3Data, 201); + assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1); + assertRemoteNodes(NODE1, NODE3); + + // try to add not permitted node + Map<String, Object> nodeData = createNodeAttributeMap(NODE2, _node2HaPort, _node1HaPort); + nodeData.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, NODE1); + getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2, "PUT", nodeData, 409); + + assertRemoteNodes(NODE1, NODE3); + + //connect intruder node + String nodeName = NODE2; + String nodeHostPort = (String)nodeData.get(BDBHAVirtualHostNode.ADDRESS); + File environmentPathFile = new File((String)nodeData.get(BDBHAVirtualHostNode.STORE_PATH), nodeName); + environmentPathFile.mkdirs(); + ReplicationConfig replicationConfig = new ReplicationConfig((String)nodeData.get(BDBHAVirtualHostNode.GROUP_NAME), nodeName, nodeHostPort); + replicationConfig.setHelperHosts((String)nodeData.get(BDBHAVirtualHostNode.HELPER_ADDRESS)); + EnvironmentConfig envConfig = new EnvironmentConfig(); + envConfig.setAllowCreate(true); + envConfig.setTransactional(true); + envConfig.setDurability(Durability.parse((String)nodeData.get(BDBHAVirtualHostNode.DURABILITY))); + ReplicatedEnvironment intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); + + waitForAttributeChanged(_baseNodeRestUrl + NODE1, VirtualHost.STATE, State.ERRORED.name()); + } + private void createHANode(String nodeName, int nodePort, int helperPort) throws Exception { + Map<String, Object> nodeData = createNodeAttributeMap(nodeName, nodePort, helperPort); + + int responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + nodeName, "PUT", nodeData); + assertEquals("Unexpected response code for virtual host node " + nodeName + " creation request", 201, responseCode); + } + + private Map<String, Object> createNodeAttributeMap(String nodeName, int nodePort, int helperPort) + { Map<String, Object> nodeData = new HashMap<String, Object>(); nodeData.put(BDBHAVirtualHostNode.NAME, nodeName); nodeData.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); @@ -249,9 +303,7 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase nodeData.put(BDBHAVirtualHostNode.GROUP_NAME, _hostName); nodeData.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + nodePort); nodeData.put(BDBHAVirtualHostNode.HELPER_ADDRESS, "localhost:" + helperPort); - - int responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + nodeName, "PUT", nodeData); - assertEquals("Unexpected response code for virtual host node " + nodeName + " creation request", 201, responseCode); + return nodeData; } private void assertNode(String nodeName, int nodePort, int nodeHelperPort, String masterNode) throws Exception diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java index c1e2f59bd8..e0c03fe822 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.configuration.updater; import java.util.concurrent.CancellationException; +import java.util.concurrent.Future; public interface TaskExecutor { @@ -40,4 +41,6 @@ public interface TaskExecutor <T> T run(Task<T> task) throws CancellationException; + <T> Future<T> submit(Task<T> task) throws CancellationException; + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java index b140fc4a61..96e4e256b2 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java @@ -117,7 +117,8 @@ public class TaskExecutorImpl implements TaskExecutor } } - <T> Future<T> submit(Task<T> task) + @Override + public <T> Future<T> submit(Task<T> task) { checkState(); if (LOGGER.isDebugEnabled()) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java index 44bc923f0e..0cb69e4cd8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java @@ -88,7 +88,7 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< } - @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED }, desiredState = State.ACTIVE ) + @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) private void doActivate() { try @@ -176,9 +176,14 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< @StateTransition( currentState = { State.ACTIVE, State.ERRORED, State.UNINITIALIZED }, desiredState = State.STOPPED ) protected void doStop() { + stopAndSetStateTo(State.STOPPED); + } + + protected void stopAndSetStateTo(State stoppedState) + { closeChildren(); closeConfigurationStore(); - _state.set(State.STOPPED); + _state.set(stoppedState); } @Override diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java index 6e21e5325d..4343419505 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java @@ -21,6 +21,10 @@ package org.apache.qpid.server.configuration.updater; import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; public class CurrentThreadTaskExecutor implements TaskExecutor @@ -94,6 +98,45 @@ public class CurrentThreadTaskExecutor implements TaskExecutor return task.execute(); } + @Override + public <T> Future<T> submit(Task<T> task) throws CancellationException + { + checkThread(); + final T result = task.execute(); + return new Future<T>() + { + @Override + public boolean cancel(boolean mayInterruptIfRunning) + { + return true; + } + + @Override + public boolean isCancelled() + { + return false; + } + + @Override + public boolean isDone() + { + return true; + } + + @Override + public T get() throws InterruptedException, ExecutionException + { + return result; + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException + { + return result; + } + }; + } + public static TaskExecutor newStartedInstance() { TaskExecutor executor = new CurrentThreadTaskExecutor(); diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java index 3eec242703..10f56cef58 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java @@ -463,7 +463,7 @@ public class JMXManagementPluginImpl @Override public void stateChanged(ConfiguredObject<?> object, State oldState, State newState) { - if (newState == State.DELETED || newState == State.STOPPED) + if (newState == State.DELETED || newState == State.STOPPED || newState == State.ERRORED) { destroyObjectMBeans(object, newState == State.DELETED); } |
