diff options
| author | Keith Wall <kwall@apache.org> | 2014-09-06 11:07:24 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-09-06 11:07:24 +0000 |
| commit | 517ed34077143fd473462b48489d65dfa640b691 (patch) | |
| tree | 8f67a2a517e2d24b02a077323f190a950e20841a /qpid/java/bdbstore/src/main | |
| parent | 974b5ff713cd10c741b7dbce70e7974d16546a9c (diff) | |
| download | qpid-python-517ed34077143fd473462b48489d65dfa640b691.tar.gz | |
QPID-6074: [Java Broker] BDB HA VHN implement to use context variables rather than system properties to control timeouts etc
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1622846 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/main')
4 files changed, 69 insertions, 18 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java index ae0d4b13ae..18b9727a61 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java @@ -35,4 +35,5 @@ public interface ReplicatedEnvironmentConfiguration extends StandardEnvironmentC int getQuorumOverride(); Map<String, String> getReplicationParameters(); String getHelperNodeName(); + int getFacadeParameter(String parameterName, int defaultValue); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index ef583e04dc..4cd677586a 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -95,19 +95,44 @@ import org.apache.qpid.server.util.DaemonThreadFactory; public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener { public static final String MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.master_transfer_interval"; - public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.replication.db_ping_socket_timeout"; + public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout"; public static final String REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.remote_node_monitor_interval"; + public static final String ENVIRONMENT_RESTART_RETRY_LIMIT_PROPERTY_NAME = "qpid.bdb.ha.environment_restart_retry_limit"; + public static final String EXECUTOR_SHUTDOWN_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.executor_shutdown_timeout"; private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class); private static final int DEFAULT_MASTER_TRANSFER_TIMEOUT = 1000 * 60; private static final int DEFAULT_DB_PING_SOCKET_TIMEOUT = 1000; private static final int DEFAULT_REMOTE_NODE_MONITOR_INTERVAL = 1000; + private static final int DEFAULT_ENVIRONMENT_RESTART_RETRY_LIMIT = 3; + private static final int DEFAULT_EXECUTOR_SHUTDOWN_TIMEOUT = 5000; - private static final int MASTER_TRANSFER_TIMEOUT = Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT); - public static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT); - private static final int REMOTE_NODE_MONITOR_INTERVAL = Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL); - private static final int RESTART_TRY_LIMIT = 3; + /** Length of time allowed for a master transfer to complete before the operation will timeout */ + private final int _masterTransferTimeout; + + /** + * Qpid monitors the health of the other nodes in the group. This controls the length of time + * between each scan of the all the nodes. + */ + private final int _remoteNodeMonitorInterval; + + /** + * Qpid uses DbPing to establish the health of other nodes in the group. This controls the socket timeout + * (so_timeout) used on the socket underlying DbPing. + */ + private final int _dbPingSocketTimeout; + + /** + * If the environment creation fails, Qpid will automatically retry. This controls the number + * of times recreation will be attempted. + */ + private final int _environmentRestartRetryLimit; + + /** + * Length of time for executors used by facade to shutdown gracefully + */ + private final int _executorShutdownTimeout; static final SyncPolicy LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.SYNC; static final SyncPolicy REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.NO_SYNC; @@ -191,6 +216,13 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } _configuration = configuration; + + _masterTransferTimeout = configuration.getFacadeParameter(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT); + _dbPingSocketTimeout = configuration.getFacadeParameter(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT); + _remoteNodeMonitorInterval = configuration.getFacadeParameter(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL); + _environmentRestartRetryLimit = configuration.getFacadeParameter(ENVIRONMENT_RESTART_RETRY_LIMIT_PROPERTY_NAME, DEFAULT_ENVIRONMENT_RESTART_RETRY_LIMIT); + _executorShutdownTimeout = configuration.getFacadeParameter(EXECUTOR_SHUTDOWN_TIMEOUT_PROPERTY_NAME, DEFAULT_EXECUTOR_SHUTDOWN_TIMEOUT); + _defaultDurability = new Durability(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY); _prettyGroupNodeName = _configuration.getGroupName() + ":" + _configuration.getName(); @@ -319,10 +351,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan executorService.shutdown(); try { - boolean wasShutdown = executorService.awaitTermination(5000, TimeUnit.MILLISECONDS); + boolean wasShutdown = executorService.awaitTermination(_executorShutdownTimeout, TimeUnit.MILLISECONDS); if (!wasShutdown) { - LOGGER.warn("Executor service " + executorService + " did not shutdown within allowed time period, ignoring"); + LOGGER.warn("Executor service " + executorService + + " did not shutdown within allowed time period " + _executorShutdownTimeout + + ", ignoring"); } } catch (InterruptedException e) @@ -368,7 +402,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan @Override public void run() { - for (int i = 0; i < RESTART_TRY_LIMIT; i++) + for (int i = 0; i < _environmentRestartRetryLimit; i++) { try { @@ -757,7 +791,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan try { ReplicationGroupAdmin admin = createReplicationGroupAdmin(); - String newMaster = admin.transferMaster(Collections.singleton(nodeName), MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true); + String newMaster = admin.transferMaster(Collections.singleton(nodeName), + _masterTransferTimeout, TimeUnit.MILLISECONDS, true); if (LOGGER.isDebugEnabled()) { LOGGER.debug("The mastership has been transferred to " + newMaster); @@ -766,7 +801,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan catch (DatabaseException e) { LOGGER.warn("Exception on transferring the mastership to " + _prettyGroupNodeName - + " Master transfer timeout : " + MASTER_TRANSFER_TIMEOUT, e); + + " Master transfer timeout : " + _masterTransferTimeout, e); throw e; } return null; @@ -1192,13 +1227,13 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return Collections.unmodifiableSet(_permittedNodes); } - public static NodeState getRemoteNodeState(String groupName, ReplicationNode repNode) throws IOException, ServiceConnectFailedException + static NodeState getRemoteNodeState(String groupName, ReplicationNode repNode, int dbPingSocketTimeout) throws IOException, ServiceConnectFailedException { if (repNode == null) { throw new IllegalArgumentException("Node cannot be null"); } - return new DbPing(repNode, groupName, DB_PING_SOCKET_TIMEOUT).getNodeState(); + return new DbPing(repNode, groupName, dbPingSocketTimeout).getNodeState(); } public static Set<String> convertApplicationStateBytesToPermittedNodeList(byte[] applicationState) @@ -1220,7 +1255,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - public static Collection<String> connectToHelperNodeAndCheckPermittedHosts(String nodeName, String hostPort, String groupName, String helperNodeName, String helperHostPort) + public static Collection<String> connectToHelperNodeAndCheckPermittedHosts(String nodeName, String hostPort, String groupName, String helperNodeName, String helperHostPort, int dbPingSocketTimeout) { if (LOGGER.isDebugEnabled()) { @@ -1237,7 +1272,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan try { ReplicationNodeImpl node = new ReplicationNodeImpl(helperNodeName, helperHostPort); - NodeState state = getRemoteNodeState(groupName, node); + NodeState state = getRemoteNodeState(groupName, node, dbPingSocketTimeout); byte[] applicationState = state.getAppState(); permittedNodes = convertApplicationStateBytesToPermittedNodeList(applicationState); } @@ -1390,7 +1425,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan State state = _state.get(); if (state != State.CLOSED && state != State.CLOSING && continueMonitoring) { - _groupChangeExecutor.schedule(this, REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS); + _groupChangeExecutor.schedule(this, _remoteNodeMonitorInterval, TimeUnit.MILLISECONDS); } else { @@ -1491,7 +1526,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan NodeState nodeStateObject = null; try { - nodeStateObject = getRemoteNodeState((String)_configuration.getGroupName(), node); + nodeStateObject = getRemoteNodeState(_configuration.getGroupName(), node, _dbPingSocketTimeout); } catch (IOException | ServiceConnectFailedException | com.sleepycat.je.rep.utilint.BinaryProtocol.ProtocolException e ) { @@ -1509,7 +1544,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { try { - future.get(REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS); + future.get(_remoteNodeMonitorInterval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java index 109bfe6978..09d681608f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -107,6 +107,20 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact { return settings.getGroupName(); } + + @Override + public int getFacadeParameter(final String parameterName, final int defaultValue) + { + if (parent.getContextKeys(false).contains(parameterName)) + { + return parent.getContextValue(Integer.class, parameterName); + } + else + { + return defaultValue; + } + } + }; return new ReplicatedEnvironmentFacade(configuration); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 90537bfbb4..b9f0764f67 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -284,7 +284,8 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { try { - Collection<String> permittedNodes = ReplicatedEnvironmentFacade.connectToHelperNodeAndCheckPermittedHosts(getName(), getAddress(), getGroupName(), getHelperNodeName(), getHelperAddress()); + int dbPingSocketTimeout = getContextKeys(false).contains("qpid.bdb.ha.db_ping_socket_timeout") ? getContextValue(Integer.class, "qpid.bdb.ha.db_ping_socket_timeout") : 10000 /* JE's own default */; + Collection<String> permittedNodes = ReplicatedEnvironmentFacade.connectToHelperNodeAndCheckPermittedHosts(getName(), getAddress(), getGroupName(), getHelperNodeName(), getHelperAddress(), dbPingSocketTimeout); setAttribute(PERMITTED_NODES, null, new ArrayList<String>(permittedNodes)); } catch(IllegalConfigurationException e) |
