diff options
| author | Alex Rudyy <orudyy@apache.org> | 2014-08-06 12:21:59 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2014-08-06 12:21:59 +0000 |
| commit | 1f2d3541f070cedbda87e3df6ee692edbcdf9381 (patch) | |
| tree | 0142e6a7babc1f00b87c805f567d6338a653986f /qpid/java/bdbstore/src/main | |
| parent | 90c8a29045f18554fd4c2da5ad01dd00af11cae7 (diff) | |
| download | qpid-python-1f2d3541f070cedbda87e3df6ee692edbcdf9381.tar.gz | |
QPID-5967: Intruder node detection must be mandatory and should validate all necessary arguments
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616186 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/main')
4 files changed, 128 insertions, 85 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 4c98f9fb26..92115dd39f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -105,7 +105,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private static final int DEFAULT_REMOTE_NODE_MONITOR_INTERVAL = 1000; private static final int MASTER_TRANSFER_TIMEOUT = Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT); - private static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT); + public static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT); private static final int REMOTE_NODE_MONITOR_INTERVAL = Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL); private static final int RESTART_TRY_LIMIT = 3; @@ -149,7 +149,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min"); }}); - private static final String PERMITTED_NODE_LIST = "permittedNodes"; + public static final String PERMITTED_NODE_LIST = "permittedNodes"; private final ReplicatedEnvironmentConfiguration _configuration; private final String _prettyGroupNodeName; @@ -978,7 +978,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.info("Node name " + nodeName); LOGGER.info("Node host port " + hostPort); LOGGER.info("Helper host port " + helperHostPort); - LOGGER.info("Helper host name " + helperNodeName); + LOGGER.info("Helper node name " + helperNodeName); LOGGER.info("Durability " + _defaultDurability); LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary); LOGGER.info("Node priority " + priority); @@ -986,10 +986,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.info("Permitted node list " + _permittedNodes); } - if (helperNodeName != null && _permittedNodes.isEmpty() && !helperHostPort.equals(hostPort)) - { - connectToHelperNodeAndCheckPermittedHosts(helperNodeName, helperHostPort, hostPort); - } Map<String, String> replicationEnvironmentParameters = new HashMap<>(ReplicatedEnvironmentFacade.REPCONFIG_DEFAULTS); replicationEnvironmentParameters.putAll(_configuration.getReplicationParameters()); @@ -1241,64 +1237,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return baos.toByteArray(); } - Collection<String> bytesToPermittedNodeList(byte[] applicationState) - { - if (applicationState == null || applicationState.length == 0) - { - return Collections.emptySet(); - } - - ObjectMapper objectMapper = new ObjectMapper(); - try - { - Map<String, Object> settings = objectMapper.readValue(applicationState, Map.class); - return (Collection<String>)settings.get(PERMITTED_NODE_LIST); - } - catch (Exception e) - { - throw new RuntimeException("Unexpected exception on de-serializing of application state", e); - } - } - - private void connectToHelperNodeAndCheckPermittedHosts(String helperNodeName, String helperHostPort, String hostPort) - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug(String.format("Requesting state of the node '%s' at '%s'", helperNodeName, helperHostPort)); - } - - Collection<String> permittedNodes = null; - try - { - NodeState state = getRemoteNodeState(new ReplicationNodeImpl(helperNodeName, helperHostPort)); - byte[] applicationState = state.getAppState(); - permittedNodes = bytesToPermittedNodeList(applicationState); - } - catch (IOException e) - { - throw new IllegalConfigurationException(String.format("Cannot connect to '%s'", helperHostPort), e); - } - catch (ServiceConnectFailedException e) - { - throw new IllegalConfigurationException(String.format("Failure to connect to '%s'", helperHostPort), e); - } - catch (Exception e) - { - throw new RuntimeException(String.format("Unexpected exception on attempt to retrieve state from '%s' at '%s'", - helperNodeName, helperHostPort), e); - } - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug(String.format("Attribute 'permittedNodes' on node '%s' is set to '%s'", helperNodeName, String.valueOf(permittedNodes))); - } - - if (permittedNodes != null && !permittedNodes.isEmpty() && !permittedNodes.contains(hostPort)) - { - throw new IllegalConfigurationException(String.format("Node from '%s' is not permitted!", hostPort)); - } - } - private void populateExistingRemoteReplicationNodes() { ReplicationGroup group = _environment.getGroup(); @@ -1542,7 +1480,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - static class ReplicationNodeImpl implements ReplicationNode + public static class ReplicationNodeImpl implements ReplicationNode { private final InetSocketAddress _address; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java index 477ffb5a64..e2503a8b51 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java @@ -56,6 +56,6 @@ public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends Virtual @ManagedAttribute(mandatory = true, defaultValue = "0") Long getStoreOverfullSize(); - @ManagedAttribute + @ManagedAttribute(mandatory = true) List<String> getPermittedNodes(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java index aaa5f6aca4..a005bca194 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java @@ -134,26 +134,30 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm if(changedAttributes.contains(PERMITTED_NODES)) { + validatePermittedNodes(((BDBHAVirtualHost<?>)proxyForValidation).getPermittedNodes()); + } + } - List<String> permittedNodes = ((BDBHAVirtualHost<?>)proxyForValidation).getPermittedNodes(); - if (permittedNodes != null) + private void validatePermittedNodes(List<String> permittedNodes) + { + if (permittedNodes == null || permittedNodes.isEmpty()) + { + throw new IllegalArgumentException(String.format("Attribute '%s' is mandatory and must be set", PERMITTED_NODES)); + } + for (String permittedNode: permittedNodes) + { + String[] tokens = permittedNode.split(":"); + if (tokens.length != 2) + { + throw new IllegalArgumentException(String.format("Invalid permitted node specified '%s'. ", permittedNode)); + } + try { - for (String permittedNode: permittedNodes) - { - String[] tokens = permittedNode.split(":"); - if (tokens.length != 2) - { - throw new IllegalArgumentException(String.format("Invalid permitted node specified '%s'. ", permittedNode)); - } - try - { - Integer.parseInt(tokens[1]); - } - catch(Exception e) - { - throw new IllegalArgumentException(String.format("Invalid port is specified in permitted node '%s'. ", permittedNode)); - } - } + Integer.parseInt(tokens[1]); + } + catch(Exception e) + { + throw new IllegalArgumentException(String.format("Invalid port is specified in permitted node '%s'. ", permittedNode)); } } } @@ -193,6 +197,16 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm return _permittedNodes; } + @Override + public void onValidate() + { + super.onValidate(); + + validatePermittedNodes(this.getPermittedNodes()); + validateTransactionSynchronizationPolicy(this.getLocalTransactionSynchronizationPolicy()); + validateTransactionSynchronizationPolicy(this.getRemoteTransactionSynchronizationPolicy()); + } + protected void applyPermittedNodes() { ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 2cdb6ec635..83a2054793 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -20,9 +20,11 @@ */ package org.apache.qpid.server.virtualhostnode.berkeleydb; +import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -41,10 +43,13 @@ import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.ReplicationNode; import com.sleepycat.je.rep.StateChangeEvent; import com.sleepycat.je.rep.StateChangeListener; +import com.sleepycat.je.rep.util.DbPing; import com.sleepycat.je.rep.util.ReplicationGroupAdmin; import com.sleepycat.je.rep.utilint.HostPortPair; +import com.sleepycat.je.rep.utilint.ServiceDispatcher; import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.updater.Task; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.messages.HighAvailabilityMessages; @@ -66,8 +71,10 @@ import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironment import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.server.store.berkeleydb.replication.ReplicationGroupListener; import org.apache.qpid.server.util.ServerScopedRuntimeException; +import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost; import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl; import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; +import org.codehaus.jackson.map.ObjectMapper; @ManagedObject( category = false, type = BDBHAVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE ) public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtualHostNodeImpl> implements @@ -253,6 +260,19 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu public void onCreate() { super.onCreate(); + if (!isFirstNodeInAGroup()) + { + try + { + connectToHelperNodeAndCheckPermittedHosts(getHelperNodeName(), getHelperAddress(), getAddress()); + } + catch(IllegalConfigurationException e) + { + deleted(); + throw e; + } + } + getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.ADDED(getName(), getGroupName())); } @@ -383,6 +403,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu return helpers; } + @Override protected void onClose() { try @@ -675,6 +696,76 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } } + private boolean isFirstNodeInAGroup() + { + return getAddress().equals(getHelperAddress()); + } + + private void connectToHelperNodeAndCheckPermittedHosts(String helperNodeName, String helperHostPort, String hostPort) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug(String.format("Requesting state of the node '%s' at '%s'", helperNodeName, helperHostPort)); + } + + if (_helperNodeName == null || "".equals(_helperNodeName)) + { + throw new IllegalConfigurationException(String.format("An attribute '%s' is not set in node '%s'" + + " on joining the group '%s'", HELPER_NODE_NAME, getName(), getGroupName())); + } + + Collection<String> permittedNodes = null; + try + { + ReplicatedEnvironmentFacade.ReplicationNodeImpl node = new ReplicatedEnvironmentFacade.ReplicationNodeImpl(helperNodeName, helperHostPort); + NodeState state = new DbPing(node, getGroupName(), ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT).getNodeState(); + byte[] applicationState = state.getAppState(); + permittedNodes = bytesToPermittedNodeList(applicationState); + } + catch (IOException e) + { + throw new IllegalConfigurationException(String.format("Cannot connect to '%s'", helperHostPort), e); + } + catch (ServiceDispatcher.ServiceConnectFailedException e) + { + throw new IllegalConfigurationException(String.format("Failure to connect to '%s'", helperHostPort), e); + } + catch (Exception e) + { + throw new RuntimeException(String.format("Unexpected exception on attempt to retrieve state from '%s' at '%s'", + helperNodeName, helperHostPort), e); + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug(String.format("Attribute 'permittedNodes' on node '%s' is set to '%s'", helperNodeName, String.valueOf(permittedNodes))); + } + + if (permittedNodes != null && !permittedNodes.isEmpty() && !permittedNodes.contains(hostPort)) + { + throw new IllegalConfigurationException(String.format("Node from '%s' is not permitted!", hostPort)); + } + } + + private Collection<String> bytesToPermittedNodeList(byte[] applicationState) + { + if (applicationState == null || applicationState.length == 0) + { + return Collections.emptySet(); + } + + ObjectMapper objectMapper = new ObjectMapper(); + try + { + Map<String, Object> settings = objectMapper.readValue(applicationState, Map.class); + return (Collection<String>)settings.get(ReplicatedEnvironmentFacade.PERMITTED_NODE_LIST); + } + catch (Exception e) + { + throw new RuntimeException("Unexpected exception on de-serializing of application state", e); + } + } + private class RemoteNodesDiscoverer implements ReplicationGroupListener { @Override |
