diff options
Diffstat (limited to 'qpid/java')
2 files changed, 41 insertions, 4 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java index be4b8199c3..0dff8417ca 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java @@ -29,9 +29,12 @@ import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Transaction; +import org.apache.log4j.Logger; public class DatabasePinger { + private static final Logger LOGGER = Logger.getLogger(DatabasePinger.class); + public static final String PING_DATABASE_NAME = "PINGDB"; private static final DatabaseConfig DATABASE_CONFIG = DatabaseConfig.DEFAULT.setAllowCreate(true).setTransactional(true); @@ -41,6 +44,11 @@ public class DatabasePinger { try { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Beginning ping transaction"); + } + final Database db = facade.openDatabase(PING_DATABASE_NAME, DATABASE_CONFIG); @@ -55,10 +63,16 @@ public class DatabasePinger txn = facade.beginTransaction(); db.put(txn, key, value); txn.commit(); + txn = null; } finally { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Ping transaction completed"); + } + if (txn != null) { txn.abort(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index fa916a1316..55349e9759 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -1352,6 +1352,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private class RemoteNodeStateLearner implements Callable<Void> { private Map<String, ReplicatedEnvironment.State> _previousGroupState = Collections.emptyMap(); + private boolean _previousDesignatedPrimary; + private int _previousElectableGroupOverride; @Override public Void call() @@ -1372,9 +1374,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan if (continueMonitoring) { + boolean currentDesignatedPrimary = isDesignatedPrimary(); + int currentElectableGroupSizeOverride = getElectableGroupSizeOverride(); + Map<ReplicationNode, NodeState> nodeStates = discoverNodeStates(_remoteReplicationNodes.values()); - executeDatabasePingerOnNodeChangesIfMaster(nodeStates); + executeDatabasePingerOnNodeChangesIfMaster(nodeStates, currentDesignatedPrimary, currentElectableGroupSizeOverride); notifyGroupListenerAboutNodeStates(nodeStates); } @@ -1523,11 +1528,19 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return nodeStates; } - private void executeDatabasePingerOnNodeChangesIfMaster(final Map<ReplicationNode, NodeState> nodeStates) + /** + * If the state of the group changes or the user alters the parameters used to determine if the + * there is quorum in the group, execute a single small transaction to discover is quorum is + * still available. This allows us to discover if quorum is lost in a timely manner, rather than + * having to await the next user transaction. + */ + private void executeDatabasePingerOnNodeChangesIfMaster(final Map<ReplicationNode, NodeState> nodeStates, + final boolean currentDesignatedPrimary, + final int currentElectableGroupSizeOverride) { if (ReplicatedEnvironment.State.MASTER == _environment.getState()) { - Map<String, ReplicatedEnvironment.State> currentGroupState = new HashMap<String, ReplicatedEnvironment.State>(); + Map<String, ReplicatedEnvironment.State> currentGroupState = new HashMap<>(); for (Map.Entry<ReplicationNode, NodeState> entry : nodeStates.entrySet()) { ReplicationNode node = entry.getKey(); @@ -1535,8 +1548,18 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan ReplicatedEnvironment.State state = nodeState == null? ReplicatedEnvironment.State.UNKNOWN : nodeState.getNodeState(); currentGroupState.put(node.getName(), state); } - boolean stateChanged = !_previousGroupState.equals(currentGroupState); + + ReplicatedEnvironmentFacade.this.isDesignatedPrimary(); + ReplicatedEnvironmentFacade.this.getElectableGroupSizeOverride(); + + boolean stateChanged = !_previousGroupState.equals(currentGroupState) + || currentDesignatedPrimary != _previousDesignatedPrimary + || currentElectableGroupSizeOverride != _previousElectableGroupOverride; + _previousGroupState = currentGroupState; + _previousDesignatedPrimary = currentDesignatedPrimary; + _previousElectableGroupOverride = currentElectableGroupSizeOverride; + if (stateChanged && State.OPEN == _state.get()) { new DatabasePinger().pingDb(ReplicatedEnvironmentFacade.this); |
