summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/test
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-10-21 16:11:49 +0000
committerKeith Wall <kwall@apache.org>2014-10-21 16:11:49 +0000
commitcfb1b1056e35892c04fbdafd486913bba5054587 (patch)
treec045267a61aa7cb2b4299d1ba3275157aec782b6 /qpid/java/bdbstore/src/test
parentf9b8cebdef0951eb643e6dbd6a41b3f2a70c5104 (diff)
downloadqpid-python-cfb1b1056e35892c04fbdafd486913bba5054587.tar.gz
QPID-6154: [Java Broker] HA - Handle rollback of node when use of weak durability has allowed nodes to diverge
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1633407 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/test')
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java93
1 files changed, 93 insertions, 0 deletions
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index bef5a87217..ed468c0844 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -33,6 +33,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.bind.tuple.StringBinding;
+import com.sleepycat.je.DatabaseEntry;
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -743,6 +746,91 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertTrue("Intruder node was not detected", intruderLatch.await(10, TimeUnit.SECONDS));
}
+ public void testNodeRolledback() throws Exception
+ {
+ DatabaseConfig createConfig = new DatabaseConfig();
+ createConfig.setAllowCreate(true);
+ createConfig.setTransactional(true);
+
+ ReplicatedEnvironmentFacade node1 = createMaster();
+
+ String replicaNodeHostPort = "localhost:" + _portHelper.getNextAvailable();
+
+ String replicaName = TEST_NODE_NAME + 1;
+ ReplicatedEnvironmentFacade node2 = createReplica(replicaName, replicaNodeHostPort, new NoopReplicationGroupListener());
+
+ node1.setDesignatedPrimary(true);
+
+ Transaction txn = node1.beginTransaction();
+ Database db = node1.getEnvironment().openDatabase(txn, "mydb", createConfig);
+ txn.commit();
+
+ // Put a record (that will be replicated)
+ putRecord(node1, db, 1, "value1");
+
+ node2.close();
+
+ // Put a record (that will be only on node1 as node2 is now offline)
+ putRecord(node1, db, 2, "value2");
+
+ db.close();
+
+ // Stop node1
+ node1.close();
+
+ // Restart the node2, making it primary so it becomes master
+ TestStateChangeListener node2StateChangeListener = new TestStateChangeListener(State.MASTER);
+ node2 = addNode(replicaName, replicaNodeHostPort, true, node2StateChangeListener, new NoopReplicationGroupListener());
+ boolean awaitForStateChange = node2StateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS);
+ assertTrue(replicaName + " did not go into desired state; current actual state is "
+ + node2StateChangeListener.getCurrentActualState(), awaitForStateChange);
+
+ txn = node2.beginTransaction();
+ db = node2.getEnvironment().openDatabase(txn, "mydb", DatabaseConfig.DEFAULT);
+ txn.commit();
+
+ // Do a transaction on node2. The two environments will have diverged
+ putRecord(node2, db, 3, "diverged");
+
+ // Now restart node1 and ensure that it realises it needs to rollback before it can rejoin.
+ TestStateChangeListener node1StateChangeListener = new TestStateChangeListener(State.REPLICA);
+ final CountDownLatch _replicaRolledback = new CountDownLatch(1);
+ node1 = addNode(node1StateChangeListener, new NoopReplicationGroupListener()
+ {
+ @Override
+ public void onNodeRolledback()
+ {
+ _replicaRolledback.countDown();
+ }
+ });
+ assertTrue("Node 1 did not go into desired state",
+ node1StateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+ assertTrue("Node 1 did not experience rollback within timeout",
+ _replicaRolledback.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ // Finally do one more transaction through the master
+ putRecord(node2, db, 4, "value4");
+ db.close();
+
+ node1.close();
+ node2.close();
+ }
+
+ private void putRecord(final ReplicatedEnvironmentFacade master, final Database db, final int keyValue,
+ final String dataValue)
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+
+ Transaction txn = master.beginTransaction();
+ IntegerBinding.intToEntry(keyValue, key);
+ StringBinding.stringToEntry(dataValue, data);
+
+ db.put(txn, key, data);
+ txn.commit();
+ }
+
+
private void createIntruder(String nodeName, String node1NodeHostPort)
{
File environmentPathFile = new File(_storePath, nodeName);
@@ -883,5 +971,10 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
{
}
+ @Override
+ public void onNodeRolledback()
+ {
+ }
+
}
}