summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/main
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-09-06 11:07:24 +0000
committerKeith Wall <kwall@apache.org>2014-09-06 11:07:24 +0000
commit517ed34077143fd473462b48489d65dfa640b691 (patch)
tree8f67a2a517e2d24b02a077323f190a950e20841a /qpid/java/bdbstore/src/main
parent974b5ff713cd10c741b7dbce70e7974d16546a9c (diff)
downloadqpid-python-517ed34077143fd473462b48489d65dfa640b691.tar.gz
QPID-6074: [Java Broker] BDB HA VHN implement to use context variables rather than system properties to control timeouts etc
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1622846 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/main')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java1
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java69
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java14
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java3
4 files changed, 69 insertions, 18 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
index ae0d4b13ae..18b9727a61 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
@@ -35,4 +35,5 @@ public interface ReplicatedEnvironmentConfiguration extends StandardEnvironmentC
int getQuorumOverride();
Map<String, String> getReplicationParameters();
String getHelperNodeName();
+ int getFacadeParameter(String parameterName, int defaultValue);
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index ef583e04dc..4cd677586a 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -95,19 +95,44 @@ import org.apache.qpid.server.util.DaemonThreadFactory;
public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener
{
public static final String MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.master_transfer_interval";
- public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.replication.db_ping_socket_timeout";
+ public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout";
public static final String REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.remote_node_monitor_interval";
+ public static final String ENVIRONMENT_RESTART_RETRY_LIMIT_PROPERTY_NAME = "qpid.bdb.ha.environment_restart_retry_limit";
+ public static final String EXECUTOR_SHUTDOWN_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.executor_shutdown_timeout";
private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class);
private static final int DEFAULT_MASTER_TRANSFER_TIMEOUT = 1000 * 60;
private static final int DEFAULT_DB_PING_SOCKET_TIMEOUT = 1000;
private static final int DEFAULT_REMOTE_NODE_MONITOR_INTERVAL = 1000;
+ private static final int DEFAULT_ENVIRONMENT_RESTART_RETRY_LIMIT = 3;
+ private static final int DEFAULT_EXECUTOR_SHUTDOWN_TIMEOUT = 5000;
- private static final int MASTER_TRANSFER_TIMEOUT = Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT);
- public static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT);
- private static final int REMOTE_NODE_MONITOR_INTERVAL = Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL);
- private static final int RESTART_TRY_LIMIT = 3;
+ /** Length of time allowed for a master transfer to complete before the operation will timeout */
+ private final int _masterTransferTimeout;
+
+ /**
+ * Qpid monitors the health of the other nodes in the group. This controls the length of time
+ * between each scan of the all the nodes.
+ */
+ private final int _remoteNodeMonitorInterval;
+
+ /**
+ * Qpid uses DbPing to establish the health of other nodes in the group. This controls the socket timeout
+ * (so_timeout) used on the socket underlying DbPing.
+ */
+ private final int _dbPingSocketTimeout;
+
+ /**
+ * If the environment creation fails, Qpid will automatically retry. This controls the number
+ * of times recreation will be attempted.
+ */
+ private final int _environmentRestartRetryLimit;
+
+ /**
+ * Length of time for executors used by facade to shutdown gracefully
+ */
+ private final int _executorShutdownTimeout;
static final SyncPolicy LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.SYNC;
static final SyncPolicy REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.NO_SYNC;
@@ -191,6 +216,13 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
_configuration = configuration;
+
+ _masterTransferTimeout = configuration.getFacadeParameter(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT);
+ _dbPingSocketTimeout = configuration.getFacadeParameter(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT);
+ _remoteNodeMonitorInterval = configuration.getFacadeParameter(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL);
+ _environmentRestartRetryLimit = configuration.getFacadeParameter(ENVIRONMENT_RESTART_RETRY_LIMIT_PROPERTY_NAME, DEFAULT_ENVIRONMENT_RESTART_RETRY_LIMIT);
+ _executorShutdownTimeout = configuration.getFacadeParameter(EXECUTOR_SHUTDOWN_TIMEOUT_PROPERTY_NAME, DEFAULT_EXECUTOR_SHUTDOWN_TIMEOUT);
+
_defaultDurability = new Durability(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY);
_prettyGroupNodeName = _configuration.getGroupName() + ":" + _configuration.getName();
@@ -319,10 +351,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
executorService.shutdown();
try
{
- boolean wasShutdown = executorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
+ boolean wasShutdown = executorService.awaitTermination(_executorShutdownTimeout, TimeUnit.MILLISECONDS);
if (!wasShutdown)
{
- LOGGER.warn("Executor service " + executorService + " did not shutdown within allowed time period, ignoring");
+ LOGGER.warn("Executor service " + executorService +
+ " did not shutdown within allowed time period " + _executorShutdownTimeout +
+ ", ignoring");
}
}
catch (InterruptedException e)
@@ -368,7 +402,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
@Override
public void run()
{
- for (int i = 0; i < RESTART_TRY_LIMIT; i++)
+ for (int i = 0; i < _environmentRestartRetryLimit; i++)
{
try
{
@@ -757,7 +791,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
try
{
ReplicationGroupAdmin admin = createReplicationGroupAdmin();
- String newMaster = admin.transferMaster(Collections.singleton(nodeName), MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true);
+ String newMaster = admin.transferMaster(Collections.singleton(nodeName),
+ _masterTransferTimeout, TimeUnit.MILLISECONDS, true);
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("The mastership has been transferred to " + newMaster);
@@ -766,7 +801,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
catch (DatabaseException e)
{
LOGGER.warn("Exception on transferring the mastership to " + _prettyGroupNodeName
- + " Master transfer timeout : " + MASTER_TRANSFER_TIMEOUT, e);
+ + " Master transfer timeout : " + _masterTransferTimeout, e);
throw e;
}
return null;
@@ -1192,13 +1227,13 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return Collections.unmodifiableSet(_permittedNodes);
}
- public static NodeState getRemoteNodeState(String groupName, ReplicationNode repNode) throws IOException, ServiceConnectFailedException
+ static NodeState getRemoteNodeState(String groupName, ReplicationNode repNode, int dbPingSocketTimeout) throws IOException, ServiceConnectFailedException
{
if (repNode == null)
{
throw new IllegalArgumentException("Node cannot be null");
}
- return new DbPing(repNode, groupName, DB_PING_SOCKET_TIMEOUT).getNodeState();
+ return new DbPing(repNode, groupName, dbPingSocketTimeout).getNodeState();
}
public static Set<String> convertApplicationStateBytesToPermittedNodeList(byte[] applicationState)
@@ -1220,7 +1255,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
- public static Collection<String> connectToHelperNodeAndCheckPermittedHosts(String nodeName, String hostPort, String groupName, String helperNodeName, String helperHostPort)
+ public static Collection<String> connectToHelperNodeAndCheckPermittedHosts(String nodeName, String hostPort, String groupName, String helperNodeName, String helperHostPort, int dbPingSocketTimeout)
{
if (LOGGER.isDebugEnabled())
{
@@ -1237,7 +1272,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
try
{
ReplicationNodeImpl node = new ReplicationNodeImpl(helperNodeName, helperHostPort);
- NodeState state = getRemoteNodeState(groupName, node);
+ NodeState state = getRemoteNodeState(groupName, node, dbPingSocketTimeout);
byte[] applicationState = state.getAppState();
permittedNodes = convertApplicationStateBytesToPermittedNodeList(applicationState);
}
@@ -1390,7 +1425,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
State state = _state.get();
if (state != State.CLOSED && state != State.CLOSING && continueMonitoring)
{
- _groupChangeExecutor.schedule(this, REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+ _groupChangeExecutor.schedule(this, _remoteNodeMonitorInterval, TimeUnit.MILLISECONDS);
}
else
{
@@ -1491,7 +1526,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
NodeState nodeStateObject = null;
try
{
- nodeStateObject = getRemoteNodeState((String)_configuration.getGroupName(), node);
+ nodeStateObject = getRemoteNodeState(_configuration.getGroupName(), node, _dbPingSocketTimeout);
}
catch (IOException | ServiceConnectFailedException | com.sleepycat.je.rep.utilint.BinaryProtocol.ProtocolException e )
{
@@ -1509,7 +1544,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
try
{
- future.get(REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+ future.get(_remoteNodeMonitorInterval, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
index 109bfe6978..09d681608f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
@@ -107,6 +107,20 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
{
return settings.getGroupName();
}
+
+ @Override
+ public int getFacadeParameter(final String parameterName, final int defaultValue)
+ {
+ if (parent.getContextKeys(false).contains(parameterName))
+ {
+ return parent.getContextValue(Integer.class, parameterName);
+ }
+ else
+ {
+ return defaultValue;
+ }
+ }
+
};
return new ReplicatedEnvironmentFacade(configuration);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index 90537bfbb4..b9f0764f67 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -284,7 +284,8 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
{
try
{
- Collection<String> permittedNodes = ReplicatedEnvironmentFacade.connectToHelperNodeAndCheckPermittedHosts(getName(), getAddress(), getGroupName(), getHelperNodeName(), getHelperAddress());
+ int dbPingSocketTimeout = getContextKeys(false).contains("qpid.bdb.ha.db_ping_socket_timeout") ? getContextValue(Integer.class, "qpid.bdb.ha.db_ping_socket_timeout") : 10000 /* JE's own default */;
+ Collection<String> permittedNodes = ReplicatedEnvironmentFacade.connectToHelperNodeAndCheckPermittedHosts(getName(), getAddress(), getGroupName(), getHelperNodeName(), getHelperAddress(), dbPingSocketTimeout);
setAttribute(PERMITTED_NODES, null, new ArrayList<String>(permittedNodes));
}
catch(IllegalConfigurationException e)