summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/main
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-08-06 12:21:59 +0000
committerAlex Rudyy <orudyy@apache.org>2014-08-06 12:21:59 +0000
commit1f2d3541f070cedbda87e3df6ee692edbcdf9381 (patch)
tree0142e6a7babc1f00b87c805f567d6338a653986f /qpid/java/bdbstore/src/main
parent90c8a29045f18554fd4c2da5ad01dd00af11cae7 (diff)
downloadqpid-python-1f2d3541f070cedbda87e3df6ee692edbcdf9381.tar.gz
QPID-5967: Intruder node detection must be mandatory and should validate all necessary arguments
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616186 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/main')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java70
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java50
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java91
4 files changed, 128 insertions, 85 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 4c98f9fb26..92115dd39f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -105,7 +105,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private static final int DEFAULT_REMOTE_NODE_MONITOR_INTERVAL = 1000;
private static final int MASTER_TRANSFER_TIMEOUT = Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT);
- private static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT);
+ public static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT);
private static final int REMOTE_NODE_MONITOR_INTERVAL = Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL);
private static final int RESTART_TRY_LIMIT = 3;
@@ -149,7 +149,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
}});
- private static final String PERMITTED_NODE_LIST = "permittedNodes";
+ public static final String PERMITTED_NODE_LIST = "permittedNodes";
private final ReplicatedEnvironmentConfiguration _configuration;
private final String _prettyGroupNodeName;
@@ -978,7 +978,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
LOGGER.info("Node name " + nodeName);
LOGGER.info("Node host port " + hostPort);
LOGGER.info("Helper host port " + helperHostPort);
- LOGGER.info("Helper host name " + helperNodeName);
+ LOGGER.info("Helper node name " + helperNodeName);
LOGGER.info("Durability " + _defaultDurability);
LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary);
LOGGER.info("Node priority " + priority);
@@ -986,10 +986,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
LOGGER.info("Permitted node list " + _permittedNodes);
}
- if (helperNodeName != null && _permittedNodes.isEmpty() && !helperHostPort.equals(hostPort))
- {
- connectToHelperNodeAndCheckPermittedHosts(helperNodeName, helperHostPort, hostPort);
- }
Map<String, String> replicationEnvironmentParameters = new HashMap<>(ReplicatedEnvironmentFacade.REPCONFIG_DEFAULTS);
replicationEnvironmentParameters.putAll(_configuration.getReplicationParameters());
@@ -1241,64 +1237,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return baos.toByteArray();
}
- Collection<String> bytesToPermittedNodeList(byte[] applicationState)
- {
- if (applicationState == null || applicationState.length == 0)
- {
- return Collections.emptySet();
- }
-
- ObjectMapper objectMapper = new ObjectMapper();
- try
- {
- Map<String, Object> settings = objectMapper.readValue(applicationState, Map.class);
- return (Collection<String>)settings.get(PERMITTED_NODE_LIST);
- }
- catch (Exception e)
- {
- throw new RuntimeException("Unexpected exception on de-serializing of application state", e);
- }
- }
-
- private void connectToHelperNodeAndCheckPermittedHosts(String helperNodeName, String helperHostPort, String hostPort)
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug(String.format("Requesting state of the node '%s' at '%s'", helperNodeName, helperHostPort));
- }
-
- Collection<String> permittedNodes = null;
- try
- {
- NodeState state = getRemoteNodeState(new ReplicationNodeImpl(helperNodeName, helperHostPort));
- byte[] applicationState = state.getAppState();
- permittedNodes = bytesToPermittedNodeList(applicationState);
- }
- catch (IOException e)
- {
- throw new IllegalConfigurationException(String.format("Cannot connect to '%s'", helperHostPort), e);
- }
- catch (ServiceConnectFailedException e)
- {
- throw new IllegalConfigurationException(String.format("Failure to connect to '%s'", helperHostPort), e);
- }
- catch (Exception e)
- {
- throw new RuntimeException(String.format("Unexpected exception on attempt to retrieve state from '%s' at '%s'",
- helperNodeName, helperHostPort), e);
- }
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug(String.format("Attribute 'permittedNodes' on node '%s' is set to '%s'", helperNodeName, String.valueOf(permittedNodes)));
- }
-
- if (permittedNodes != null && !permittedNodes.isEmpty() && !permittedNodes.contains(hostPort))
- {
- throw new IllegalConfigurationException(String.format("Node from '%s' is not permitted!", hostPort));
- }
- }
-
private void populateExistingRemoteReplicationNodes()
{
ReplicationGroup group = _environment.getGroup();
@@ -1542,7 +1480,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
- static class ReplicationNodeImpl implements ReplicationNode
+ public static class ReplicationNodeImpl implements ReplicationNode
{
private final InetSocketAddress _address;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
index 477ffb5a64..e2503a8b51 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
@@ -56,6 +56,6 @@ public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends Virtual
@ManagedAttribute(mandatory = true, defaultValue = "0")
Long getStoreOverfullSize();
- @ManagedAttribute
+ @ManagedAttribute(mandatory = true)
List<String> getPermittedNodes();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
index aaa5f6aca4..a005bca194 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
@@ -134,26 +134,30 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm
if(changedAttributes.contains(PERMITTED_NODES))
{
+ validatePermittedNodes(((BDBHAVirtualHost<?>)proxyForValidation).getPermittedNodes());
+ }
+ }
- List<String> permittedNodes = ((BDBHAVirtualHost<?>)proxyForValidation).getPermittedNodes();
- if (permittedNodes != null)
+ private void validatePermittedNodes(List<String> permittedNodes)
+ {
+ if (permittedNodes == null || permittedNodes.isEmpty())
+ {
+ throw new IllegalArgumentException(String.format("Attribute '%s' is mandatory and must be set", PERMITTED_NODES));
+ }
+ for (String permittedNode: permittedNodes)
+ {
+ String[] tokens = permittedNode.split(":");
+ if (tokens.length != 2)
+ {
+ throw new IllegalArgumentException(String.format("Invalid permitted node specified '%s'. ", permittedNode));
+ }
+ try
{
- for (String permittedNode: permittedNodes)
- {
- String[] tokens = permittedNode.split(":");
- if (tokens.length != 2)
- {
- throw new IllegalArgumentException(String.format("Invalid permitted node specified '%s'. ", permittedNode));
- }
- try
- {
- Integer.parseInt(tokens[1]);
- }
- catch(Exception e)
- {
- throw new IllegalArgumentException(String.format("Invalid port is specified in permitted node '%s'. ", permittedNode));
- }
- }
+ Integer.parseInt(tokens[1]);
+ }
+ catch(Exception e)
+ {
+ throw new IllegalArgumentException(String.format("Invalid port is specified in permitted node '%s'. ", permittedNode));
}
}
}
@@ -193,6 +197,16 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm
return _permittedNodes;
}
+ @Override
+ public void onValidate()
+ {
+ super.onValidate();
+
+ validatePermittedNodes(this.getPermittedNodes());
+ validateTransactionSynchronizationPolicy(this.getLocalTransactionSynchronizationPolicy());
+ validateTransactionSynchronizationPolicy(this.getRemoteTransactionSynchronizationPolicy());
+ }
+
protected void applyPermittedNodes()
{
ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index 2cdb6ec635..83a2054793 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -20,9 +20,11 @@
*/
package org.apache.qpid.server.virtualhostnode.berkeleydb;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -41,10 +43,13 @@ import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicationNode;
import com.sleepycat.je.rep.StateChangeEvent;
import com.sleepycat.je.rep.StateChangeListener;
+import com.sleepycat.je.rep.util.DbPing;
import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
import com.sleepycat.je.rep.utilint.HostPortPair;
+import com.sleepycat.je.rep.utilint.ServiceDispatcher;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.Task;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.HighAvailabilityMessages;
@@ -66,8 +71,10 @@ import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironment
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicationGroupListener;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost;
import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl;
import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
+import org.codehaus.jackson.map.ObjectMapper;
@ManagedObject( category = false, type = BDBHAVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE )
public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtualHostNodeImpl> implements
@@ -253,6 +260,19 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
public void onCreate()
{
super.onCreate();
+ if (!isFirstNodeInAGroup())
+ {
+ try
+ {
+ connectToHelperNodeAndCheckPermittedHosts(getHelperNodeName(), getHelperAddress(), getAddress());
+ }
+ catch(IllegalConfigurationException e)
+ {
+ deleted();
+ throw e;
+ }
+ }
+
getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.ADDED(getName(), getGroupName()));
}
@@ -383,6 +403,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
return helpers;
}
+ @Override
protected void onClose()
{
try
@@ -675,6 +696,76 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
}
+ private boolean isFirstNodeInAGroup()
+ {
+ return getAddress().equals(getHelperAddress());
+ }
+
+ private void connectToHelperNodeAndCheckPermittedHosts(String helperNodeName, String helperHostPort, String hostPort)
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug(String.format("Requesting state of the node '%s' at '%s'", helperNodeName, helperHostPort));
+ }
+
+ if (_helperNodeName == null || "".equals(_helperNodeName))
+ {
+ throw new IllegalConfigurationException(String.format("An attribute '%s' is not set in node '%s'"
+ + " on joining the group '%s'", HELPER_NODE_NAME, getName(), getGroupName()));
+ }
+
+ Collection<String> permittedNodes = null;
+ try
+ {
+ ReplicatedEnvironmentFacade.ReplicationNodeImpl node = new ReplicatedEnvironmentFacade.ReplicationNodeImpl(helperNodeName, helperHostPort);
+ NodeState state = new DbPing(node, getGroupName(), ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT).getNodeState();
+ byte[] applicationState = state.getAppState();
+ permittedNodes = bytesToPermittedNodeList(applicationState);
+ }
+ catch (IOException e)
+ {
+ throw new IllegalConfigurationException(String.format("Cannot connect to '%s'", helperHostPort), e);
+ }
+ catch (ServiceDispatcher.ServiceConnectFailedException e)
+ {
+ throw new IllegalConfigurationException(String.format("Failure to connect to '%s'", helperHostPort), e);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(String.format("Unexpected exception on attempt to retrieve state from '%s' at '%s'",
+ helperNodeName, helperHostPort), e);
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug(String.format("Attribute 'permittedNodes' on node '%s' is set to '%s'", helperNodeName, String.valueOf(permittedNodes)));
+ }
+
+ if (permittedNodes != null && !permittedNodes.isEmpty() && !permittedNodes.contains(hostPort))
+ {
+ throw new IllegalConfigurationException(String.format("Node from '%s' is not permitted!", hostPort));
+ }
+ }
+
+ private Collection<String> bytesToPermittedNodeList(byte[] applicationState)
+ {
+ if (applicationState == null || applicationState.length == 0)
+ {
+ return Collections.emptySet();
+ }
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ try
+ {
+ Map<String, Object> settings = objectMapper.readValue(applicationState, Map.class);
+ return (Collection<String>)settings.get(ReplicatedEnvironmentFacade.PERMITTED_NODE_LIST);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Unexpected exception on de-serializing of application state", e);
+ }
+ }
+
private class RemoteNodesDiscoverer implements ReplicationGroupListener
{
@Override