diff options
| author | Keith Wall <kwall@apache.org> | 2014-10-21 16:11:49 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-10-21 16:11:49 +0000 |
| commit | cfb1b1056e35892c04fbdafd486913bba5054587 (patch) | |
| tree | c045267a61aa7cb2b4299d1ba3275157aec782b6 /qpid/java | |
| parent | f9b8cebdef0951eb643e6dbd6a41b3f2a70c5104 (diff) | |
| download | qpid-python-cfb1b1056e35892c04fbdafd486913bba5054587.tar.gz | |
QPID-6154: [Java Broker] HA - Handle rollback of node when use of weak durability has allowed nodes to diverge
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1633407 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
6 files changed, 175 insertions, 22 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 8877f6a156..d5425019a8 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -53,27 +53,12 @@ import com.sleepycat.je.Durability.ReplicaAckPolicy; import com.sleepycat.je.Durability.SyncPolicy; import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.EnvironmentFailureException; +import com.sleepycat.je.ExceptionEvent; import com.sleepycat.je.Sequence; import com.sleepycat.je.SequenceConfig; import com.sleepycat.je.Transaction; import com.sleepycat.je.TransactionConfig; -import com.sleepycat.je.rep.AppStateMonitor; -import com.sleepycat.je.rep.InsufficientAcksException; -import com.sleepycat.je.rep.InsufficientLogException; -import com.sleepycat.je.rep.InsufficientReplicasException; -import com.sleepycat.je.rep.NetworkRestore; -import com.sleepycat.je.rep.NetworkRestoreConfig; -import com.sleepycat.je.rep.NodeState; -import com.sleepycat.je.rep.NodeType; -import com.sleepycat.je.rep.RepInternal; -import com.sleepycat.je.rep.ReplicatedEnvironment; -import com.sleepycat.je.rep.ReplicationConfig; -import com.sleepycat.je.rep.ReplicationGroup; -import com.sleepycat.je.rep.ReplicationMutableConfig; -import com.sleepycat.je.rep.ReplicationNode; -import com.sleepycat.je.rep.RestartRequiredException; -import com.sleepycat.je.rep.StateChangeEvent; -import com.sleepycat.je.rep.StateChangeListener; +import com.sleepycat.je.rep.*; import com.sleepycat.je.rep.util.DbPing; import com.sleepycat.je.rep.util.ReplicationGroupAdmin; import com.sleepycat.je.rep.utilint.HostPortPair; @@ -90,7 +75,6 @@ import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter; import org.apache.qpid.server.store.berkeleydb.EnvHomeRegistry; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; -import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.DaemonThreadFactory; @@ -200,6 +184,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private volatile long _joinTime; private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState; private volatile long _envSetupTimeoutMillis; + /** Flag set true when JE need to discard transactions in order to rejoin the group */ + private volatile boolean _nodeRolledback; public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration) { @@ -410,7 +396,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private void tryToRestartEnvironment(final DatabaseException dbe) { - if (_state.compareAndSet(State.OPEN, State.RESTARTING)) + if (_state.compareAndSet(State.OPEN, State.RESTARTING) || _state.compareAndSet(State.OPENING, State.RESTARTING)) { if (dbe != null && LOGGER.isDebugEnabled()) { @@ -1100,7 +1086,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan EnvironmentConfig envConfig = new EnvironmentConfig(); envConfig.setAllowCreate(true); envConfig.setTransactional(true); - envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); + envConfig.setExceptionListener(new ExceptionListener()); envConfig.setDurability(_defaultDurability); for (Map.Entry<String, String> configItem : environmentParameters.entrySet()) @@ -1229,6 +1215,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan if (_replicationGroupListener.compareAndSet(null, replicationGroupListener)) { notifyExistingRemoteReplicationNodes(replicationGroupListener); + notifyNodeRolledbackIfNecessary(replicationGroupListener); } else { @@ -1459,6 +1446,15 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } + private void notifyNodeRolledbackIfNecessary(ReplicationGroupListener listener) + { + if (_nodeRolledback) + { + listener.onNodeRolledback(); + _nodeRolledback = false; + } + } + private class RemoteNodeStateLearner implements Callable<Void> { private Map<String, ReplicatedEnvironment.State> _previousGroupState = Collections.emptyMap(); @@ -1491,7 +1487,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan executeDatabasePingerOnNodeChangesIfMaster(nodeStates, currentDesignatedPrimary, currentElectableGroupSizeOverride); - notifyGroupListenerAboutNodeStates(nodeStates); + notifyGroupListenerAboutNodeStates(nodeStates); } } } @@ -1776,4 +1772,28 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan '}'; } } + + private class ExceptionListener implements com.sleepycat.je.ExceptionListener + { + @Override + public void exceptionThrown(final ExceptionEvent event) + { + if (event.getException() instanceof RollbackException) + { + // Usually caused use of weak durability options: node priority zero, + // designated primary, electable group override. + RollbackException re = (RollbackException) event.getException(); + + LOGGER.warn(_prettyGroupNodeName + " has transaction(s) ahead of the current master. These" + + " must be discarded to allow this node to rejoin the group." + + " This condition is normally caused by the use of weak durability options."); + _nodeRolledback = true; + tryToRestartEnvironment(re); + } + else + { + LOGGER.error("Asynchronous exception thrown by BDB thread '" + event.getThreadName() + "'", event.getException()); + } + } + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java index cf6050c3cd..896295708a 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java @@ -55,4 +55,9 @@ public interface ReplicationGroupListener void onNoMajority(); + /** + * Signifies that node need to discard one or more transactions in order to rejoin the group. Most likely + * caused by use of the weak durability options such as node priority zero. + */ + void onNodeRolledback(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index db2e1277c7..39cac8fdc6 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -1108,6 +1108,12 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.QUORUM_LOST()); } + @Override + public void onNodeRolledback() + { + getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.NODE_ROLLEDBACK()); + } + private Map<String, Object> nodeToAttributes(ReplicationNode replicationNode) { Map<String, Object> attributes = new HashMap<String, Object>(); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index bef5a87217..ed468c0844 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -33,6 +33,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import com.sleepycat.bind.tuple.IntegerBinding; +import com.sleepycat.bind.tuple.StringBinding; +import com.sleepycat.je.DatabaseEntry; import org.apache.log4j.Logger; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -743,6 +746,91 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertTrue("Intruder node was not detected", intruderLatch.await(10, TimeUnit.SECONDS)); } + public void testNodeRolledback() throws Exception + { + DatabaseConfig createConfig = new DatabaseConfig(); + createConfig.setAllowCreate(true); + createConfig.setTransactional(true); + + ReplicatedEnvironmentFacade node1 = createMaster(); + + String replicaNodeHostPort = "localhost:" + _portHelper.getNextAvailable(); + + String replicaName = TEST_NODE_NAME + 1; + ReplicatedEnvironmentFacade node2 = createReplica(replicaName, replicaNodeHostPort, new NoopReplicationGroupListener()); + + node1.setDesignatedPrimary(true); + + Transaction txn = node1.beginTransaction(); + Database db = node1.getEnvironment().openDatabase(txn, "mydb", createConfig); + txn.commit(); + + // Put a record (that will be replicated) + putRecord(node1, db, 1, "value1"); + + node2.close(); + + // Put a record (that will be only on node1 as node2 is now offline) + putRecord(node1, db, 2, "value2"); + + db.close(); + + // Stop node1 + node1.close(); + + // Restart the node2, making it primary so it becomes master + TestStateChangeListener node2StateChangeListener = new TestStateChangeListener(State.MASTER); + node2 = addNode(replicaName, replicaNodeHostPort, true, node2StateChangeListener, new NoopReplicationGroupListener()); + boolean awaitForStateChange = node2StateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS); + assertTrue(replicaName + " did not go into desired state; current actual state is " + + node2StateChangeListener.getCurrentActualState(), awaitForStateChange); + + txn = node2.beginTransaction(); + db = node2.getEnvironment().openDatabase(txn, "mydb", DatabaseConfig.DEFAULT); + txn.commit(); + + // Do a transaction on node2. The two environments will have diverged + putRecord(node2, db, 3, "diverged"); + + // Now restart node1 and ensure that it realises it needs to rollback before it can rejoin. + TestStateChangeListener node1StateChangeListener = new TestStateChangeListener(State.REPLICA); + final CountDownLatch _replicaRolledback = new CountDownLatch(1); + node1 = addNode(node1StateChangeListener, new NoopReplicationGroupListener() + { + @Override + public void onNodeRolledback() + { + _replicaRolledback.countDown(); + } + }); + assertTrue("Node 1 did not go into desired state", + node1StateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + assertTrue("Node 1 did not experience rollback within timeout", + _replicaRolledback.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + // Finally do one more transaction through the master + putRecord(node2, db, 4, "value4"); + db.close(); + + node1.close(); + node2.close(); + } + + private void putRecord(final ReplicatedEnvironmentFacade master, final Database db, final int keyValue, + final String dataValue) + { + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry data = new DatabaseEntry(); + + Transaction txn = master.beginTransaction(); + IntegerBinding.intToEntry(keyValue, key); + StringBinding.stringToEntry(dataValue, data); + + db.put(txn, key, data); + txn.commit(); + } + + private void createIntruder(String nodeName, String node1NodeHostPort) { File environmentPathFile = new File(_storePath, nodeName); @@ -883,5 +971,10 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase { } + @Override + public void onNodeRolledback() + { + } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailabilityMessages.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailabilityMessages.java index 2234ce6b74..ee29ef6796 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailabilityMessages.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailabilityMessages.java @@ -57,6 +57,7 @@ public class HighAvailabilityMessages public static final String DELETED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.deleted"; public static final String ROLE_CHANGED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.role_changed"; public static final String DESIGNATED_PRIMARY_CHANGED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.designated_primary_changed"; + public static final String NODE_ROLLEDBACK_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.node_rolledback"; static { @@ -74,6 +75,7 @@ public class HighAvailabilityMessages Logger.getLogger(DELETED_LOG_HIERARCHY); Logger.getLogger(ROLE_CHANGED_LOG_HIERARCHY); Logger.getLogger(DESIGNATED_PRIMARY_CHANGED_LOG_HIERARCHY); + Logger.getLogger(NODE_ROLLEDBACK_LOG_HIERARCHY); _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.HighAvailability_logmessages", _currentLocale); } @@ -479,6 +481,33 @@ public class HighAvailabilityMessages }; } + /** + * Log a HighAvailability message of the Format: + * <pre>HA-1014 : Diverged transactions discarded</pre> + * Optional values are contained in [square brackets] and are numbered + * sequentially in the method call. + * + */ + public static LogMessage NODE_ROLLEDBACK() + { + String rawMessage = _messages.getString("NODE_ROLLEDBACK"); + + final String message = rawMessage; + + return new LogMessage() + { + public String toString() + { + return message; + } + + public String getLogHierarchy() + { + return NODE_ROLLEDBACK_LOG_HIERARCHY; + } + }; + } + private HighAvailabilityMessages() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailability_logmessages.properties b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailability_logmessages.properties index 1a6bff5353..15e08bdc5a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailability_logmessages.properties +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailability_logmessages.properties @@ -61,4 +61,4 @@ PRIORITY_CHANGED = HA-1012 : Priority : {0} # 0 - new value DESIGNATED_PRIMARY_CHANGED = HA-1013 : Designated primary : {0} - +NODE_ROLLEDBACK = HA-1014 : Diverged transactions discarded |
