diff options
| author | Keith Wall <kwall@apache.org> | 2014-10-21 16:11:49 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-10-21 16:11:49 +0000 |
| commit | cfb1b1056e35892c04fbdafd486913bba5054587 (patch) | |
| tree | c045267a61aa7cb2b4299d1ba3275157aec782b6 /qpid/java/bdbstore/src/test | |
| parent | f9b8cebdef0951eb643e6dbd6a41b3f2a70c5104 (diff) | |
| download | qpid-python-cfb1b1056e35892c04fbdafd486913bba5054587.tar.gz | |
QPID-6154: [Java Broker] HA - Handle rollback of node when use of weak durability has allowed nodes to diverge
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1633407 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/test')
| -rw-r--r-- | qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java | 93 |
1 files changed, 93 insertions, 0 deletions
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index bef5a87217..ed468c0844 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -33,6 +33,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import com.sleepycat.bind.tuple.IntegerBinding; +import com.sleepycat.bind.tuple.StringBinding; +import com.sleepycat.je.DatabaseEntry; import org.apache.log4j.Logger; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -743,6 +746,91 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertTrue("Intruder node was not detected", intruderLatch.await(10, TimeUnit.SECONDS)); } + public void testNodeRolledback() throws Exception + { + DatabaseConfig createConfig = new DatabaseConfig(); + createConfig.setAllowCreate(true); + createConfig.setTransactional(true); + + ReplicatedEnvironmentFacade node1 = createMaster(); + + String replicaNodeHostPort = "localhost:" + _portHelper.getNextAvailable(); + + String replicaName = TEST_NODE_NAME + 1; + ReplicatedEnvironmentFacade node2 = createReplica(replicaName, replicaNodeHostPort, new NoopReplicationGroupListener()); + + node1.setDesignatedPrimary(true); + + Transaction txn = node1.beginTransaction(); + Database db = node1.getEnvironment().openDatabase(txn, "mydb", createConfig); + txn.commit(); + + // Put a record (that will be replicated) + putRecord(node1, db, 1, "value1"); + + node2.close(); + + // Put a record (that will be only on node1 as node2 is now offline) + putRecord(node1, db, 2, "value2"); + + db.close(); + + // Stop node1 + node1.close(); + + // Restart the node2, making it primary so it becomes master + TestStateChangeListener node2StateChangeListener = new TestStateChangeListener(State.MASTER); + node2 = addNode(replicaName, replicaNodeHostPort, true, node2StateChangeListener, new NoopReplicationGroupListener()); + boolean awaitForStateChange = node2StateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS); + assertTrue(replicaName + " did not go into desired state; current actual state is " + + node2StateChangeListener.getCurrentActualState(), awaitForStateChange); + + txn = node2.beginTransaction(); + db = node2.getEnvironment().openDatabase(txn, "mydb", DatabaseConfig.DEFAULT); + txn.commit(); + + // Do a transaction on node2. The two environments will have diverged + putRecord(node2, db, 3, "diverged"); + + // Now restart node1 and ensure that it realises it needs to rollback before it can rejoin. + TestStateChangeListener node1StateChangeListener = new TestStateChangeListener(State.REPLICA); + final CountDownLatch _replicaRolledback = new CountDownLatch(1); + node1 = addNode(node1StateChangeListener, new NoopReplicationGroupListener() + { + @Override + public void onNodeRolledback() + { + _replicaRolledback.countDown(); + } + }); + assertTrue("Node 1 did not go into desired state", + node1StateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + assertTrue("Node 1 did not experience rollback within timeout", + _replicaRolledback.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + // Finally do one more transaction through the master + putRecord(node2, db, 4, "value4"); + db.close(); + + node1.close(); + node2.close(); + } + + private void putRecord(final ReplicatedEnvironmentFacade master, final Database db, final int keyValue, + final String dataValue) + { + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry data = new DatabaseEntry(); + + Transaction txn = master.beginTransaction(); + IntegerBinding.intToEntry(keyValue, key); + StringBinding.stringToEntry(dataValue, data); + + db.put(txn, key, data); + txn.commit(); + } + + private void createIntruder(String nodeName, String node1NodeHostPort) { File environmentPathFile = new File(_storePath, nodeName); @@ -883,5 +971,10 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase { } + @Override + public void onNodeRolledback() + { + } + } } |
