summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java14
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java31
2 files changed, 41 insertions, 4 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
index be4b8199c3..0dff8417ca 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
@@ -29,9 +29,12 @@ import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Transaction;
+import org.apache.log4j.Logger;
public class DatabasePinger
{
+ private static final Logger LOGGER = Logger.getLogger(DatabasePinger.class);
+
public static final String PING_DATABASE_NAME = "PINGDB";
private static final DatabaseConfig DATABASE_CONFIG =
DatabaseConfig.DEFAULT.setAllowCreate(true).setTransactional(true);
@@ -41,6 +44,11 @@ public class DatabasePinger
{
try
{
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Beginning ping transaction");
+ }
+
final Database db = facade.openDatabase(PING_DATABASE_NAME,
DATABASE_CONFIG);
@@ -55,10 +63,16 @@ public class DatabasePinger
txn = facade.beginTransaction();
db.put(txn, key, value);
txn.commit();
+
txn = null;
}
finally
{
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Ping transaction completed");
+ }
+
if (txn != null)
{
txn.abort();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index fa916a1316..55349e9759 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -1352,6 +1352,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private class RemoteNodeStateLearner implements Callable<Void>
{
private Map<String, ReplicatedEnvironment.State> _previousGroupState = Collections.emptyMap();
+ private boolean _previousDesignatedPrimary;
+ private int _previousElectableGroupOverride;
@Override
public Void call()
@@ -1372,9 +1374,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
if (continueMonitoring)
{
+ boolean currentDesignatedPrimary = isDesignatedPrimary();
+ int currentElectableGroupSizeOverride = getElectableGroupSizeOverride();
+
Map<ReplicationNode, NodeState> nodeStates = discoverNodeStates(_remoteReplicationNodes.values());
- executeDatabasePingerOnNodeChangesIfMaster(nodeStates);
+ executeDatabasePingerOnNodeChangesIfMaster(nodeStates, currentDesignatedPrimary, currentElectableGroupSizeOverride);
notifyGroupListenerAboutNodeStates(nodeStates);
}
@@ -1523,11 +1528,19 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return nodeStates;
}
- private void executeDatabasePingerOnNodeChangesIfMaster(final Map<ReplicationNode, NodeState> nodeStates)
+ /**
+ * If the state of the group changes or the user alters the parameters used to determine if the
+ * there is quorum in the group, execute a single small transaction to discover is quorum is
+ * still available. This allows us to discover if quorum is lost in a timely manner, rather than
+ * having to await the next user transaction.
+ */
+ private void executeDatabasePingerOnNodeChangesIfMaster(final Map<ReplicationNode, NodeState> nodeStates,
+ final boolean currentDesignatedPrimary,
+ final int currentElectableGroupSizeOverride)
{
if (ReplicatedEnvironment.State.MASTER == _environment.getState())
{
- Map<String, ReplicatedEnvironment.State> currentGroupState = new HashMap<String, ReplicatedEnvironment.State>();
+ Map<String, ReplicatedEnvironment.State> currentGroupState = new HashMap<>();
for (Map.Entry<ReplicationNode, NodeState> entry : nodeStates.entrySet())
{
ReplicationNode node = entry.getKey();
@@ -1535,8 +1548,18 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
ReplicatedEnvironment.State state = nodeState == null? ReplicatedEnvironment.State.UNKNOWN : nodeState.getNodeState();
currentGroupState.put(node.getName(), state);
}
- boolean stateChanged = !_previousGroupState.equals(currentGroupState);
+
+ ReplicatedEnvironmentFacade.this.isDesignatedPrimary();
+ ReplicatedEnvironmentFacade.this.getElectableGroupSizeOverride();
+
+ boolean stateChanged = !_previousGroupState.equals(currentGroupState)
+ || currentDesignatedPrimary != _previousDesignatedPrimary
+ || currentElectableGroupSizeOverride != _previousElectableGroupOverride;
+
_previousGroupState = currentGroupState;
+ _previousDesignatedPrimary = currentDesignatedPrimary;
+ _previousElectableGroupOverride = currentElectableGroupSizeOverride;
+
if (stateChanged && State.OPEN == _state.get())
{
new DatabasePinger().pingDb(ReplicatedEnvironmentFacade.this);