diff options
| author | Alex Rudyy <orudyy@apache.org> | 2014-06-03 08:25:16 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2014-06-03 08:25:16 +0000 |
| commit | f418f3407dd6e080d2c4234e75ab6813f2ace4ea (patch) | |
| tree | bd7069626e803a3432812f447e08b05a801e44f0 /qpid/java/bdbstore/src/test | |
| parent | 0cf0c4e013554eff08e542f6f5ce64cc062ada2e (diff) | |
| download | qpid-python-f418f3407dd6e080d2c4234e75ab6813f2ace4ea.tar.gz | |
QPID-5715: Use coalescing sync committer for message store transactions only when syncronization policy SYNC is set for local transactions in BDB HA virtual host
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1599445 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/test')
2 files changed, 51 insertions, 18 deletions
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java index 563665917c..962e19f81c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java @@ -174,7 +174,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase assertEquals(nodeHostPort, replicationConfig.getNodeHostPort()); assertEquals(helperHostPort, replicationConfig.getHelperHosts()); - assertEquals("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", environment.getConfig().getDurability().toString()); + assertEquals("SYNC,NO_SYNC,SIMPLE_MAJORITY", environment.getConfig().getDurability().toString()); assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT)); assertTrue("Virtual host child has not been added", virtualHostAddedLatch.await(30, TimeUnit.SECONDS)); @@ -465,17 +465,21 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase BDBHAVirtualHostImpl virtualHost = (BDBHAVirtualHostImpl)node.getVirtualHost(); assertNotNull("Virtual host is not created", virtualHost); - assertEquals("Unexpected local transaction synchronization policy", "NO_SYNC", virtualHost.getLocalTransactionSyncronizationPolicy()); - assertEquals("Unexpected remote transaction synchronization policy", "NO_SYNC", virtualHost.getRemoteTransactionSyncronizationPolicy()); + awaitForAttributeChange(virtualHost, BDBHAVirtualHostImpl.COALESCING_SYNC, true); + + assertEquals("Unexpected local transaction synchronization policy", "SYNC", virtualHost.getLocalTransactionSynchronizationPolicy()); + assertEquals("Unexpected remote transaction synchronization policy", "NO_SYNC", virtualHost.getRemoteTransactionSynchronizationPolicy()); + assertTrue("CoalescingSync is not ON", virtualHost.isCoalescingSync()); Map<String, Object> virtualHostAttributes = new HashMap<String,Object>(); virtualHostAttributes.put(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "WRITE_NO_SYNC"); virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "SYNC"); virtualHost.setAttributes(virtualHostAttributes); - assertEquals("Unexpected local transaction synchronization policy", "WRITE_NO_SYNC", virtualHost.getLocalTransactionSyncronizationPolicy()); - assertEquals("Unexpected remote transaction synchronization policy", "SYNC", virtualHost.getRemoteTransactionSyncronizationPolicy()); - + awaitForAttributeChange(virtualHost, BDBHAVirtualHostImpl.COALESCING_SYNC, false); + assertEquals("Unexpected local transaction synchronization policy", "WRITE_NO_SYNC", virtualHost.getLocalTransactionSynchronizationPolicy()); + assertEquals("Unexpected remote transaction synchronization policy", "SYNC", virtualHost.getRemoteTransactionSynchronizationPolicy()); + assertFalse("CoalescingSync is not OFF", virtualHost.isCoalescingSync()); try { virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "INVALID")); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index 3660566c8c..e4f1b62954 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.qpid.server.store.berkeleydb.Committer; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.test.utils.TestFileUtils; @@ -62,9 +63,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase private static final String TEST_NODE_NAME = "testNodeName"; private static final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT; private static final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT; - private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString(); + private static final Durability TEST_DURABILITY = Durability.parse("SYNC,NO_SYNC,SIMPLE_MAJORITY"); private static final boolean TEST_DESIGNATED_PRIMARY = false; - private static final boolean TEST_COALESCING_SYNC = true; private static final int TEST_PRIORITY = 1; private static final int TEST_ELECTABLE_GROUP_OVERRIDE = 0; @@ -185,12 +185,32 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase public void testGetDurability() throws Exception { - assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability().toString()); + ReplicatedEnvironmentFacade master = createMaster(); + assertEquals("Unexpected message store durability", TEST_DURABILITY, master.getMessageStoreTransactionDurability()); + assertEquals("Unexpected durability", TEST_DURABILITY, master.getDurability()); + Committer committer = master.createCommitter(TEST_GROUP_NAME); + committer.start(); + + waitForCommitter(committer, true); + + assertEquals("Unexpected message store durability after committer start", "NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", master.getMessageStoreTransactionDurability().toString()); + + committer.stop(); + waitForCommitter(committer, false); + assertEquals("Unexpected message store durability after committer stop", TEST_DURABILITY, master.getMessageStoreTransactionDurability()); } public void testIsCoalescingSync() throws Exception { - assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, createMaster().isCoalescingSync()); + ReplicatedEnvironmentFacade master = createMaster(); + assertEquals("Unexpected coalescing sync", false, master.isCoalescingSync()); + Committer committer = master.createCommitter(TEST_GROUP_NAME); + committer.start(); + waitForCommitter(committer, true); + assertEquals("Unexpected coalescing sync", true, master.isCoalescingSync()); + committer.stop(); + waitForCommitter(committer, false); + assertEquals("Unexpected coalescing sync", false, master.isCoalescingSync()); } public void testGetNodeState() throws Exception @@ -690,20 +710,20 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase { ReplicatedEnvironmentFacade facade = createMaster(); assertEquals("Unexpected local transaction synchronization policy before change", - ReplicatedEnvironmentFacade.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getLocalTransactionSyncronizationPolicy()); - facade.setLocalTransactionSyncronizationPolicy(SyncPolicy.WRITE_NO_SYNC); + ReplicatedEnvironmentFacade.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getMessageStoreLocalTransactionSyncronizationPolicy()); + facade.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.WRITE_NO_SYNC); assertEquals("Unexpected local transaction synchronization policy after change", - SyncPolicy.WRITE_NO_SYNC, facade.getLocalTransactionSyncronizationPolicy()); + SyncPolicy.WRITE_NO_SYNC, facade.getMessageStoreLocalTransactionSyncronizationPolicy()); } public void testSetRemoteTransactionSyncronizationPolicy() throws Exception { ReplicatedEnvironmentFacade facade = createMaster(); assertEquals("Unexpected remote transaction synchronization policy before change", - ReplicatedEnvironmentFacade.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getRemoteTransactionSyncronizationPolicy()); - facade.setRemoteTransactionSyncronizationPolicy(SyncPolicy.WRITE_NO_SYNC); + ReplicatedEnvironmentFacade.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getMessageStoreRemoteTransactionSyncronizationPolicy()); + facade.setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy.WRITE_NO_SYNC); assertEquals("Unexpected remote transaction synchronization policy after change", - SyncPolicy.WRITE_NO_SYNC, facade.getRemoteTransactionSyncronizationPolicy()); + SyncPolicy.WRITE_NO_SYNC, facade.getMessageStoreRemoteTransactionSyncronizationPolicy()); } public void testBeginTransaction() throws Exception @@ -780,6 +800,17 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase return dbConfig; } + private void waitForCommitter(Committer committer, boolean expected) throws InterruptedException + { + int counter = 0; + while(committer.isStarted() != expected && counter < 100) + { + Thread.sleep(20); + counter++; + } + assertEquals("Committer is not in expected state", expected, committer.isStarted()); + } + private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary) { ReplicatedEnvironmentConfiguration node = mock(ReplicatedEnvironmentConfiguration.class); @@ -790,8 +821,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase when(node.getPriority()).thenReturn(TEST_PRIORITY); when(node.getGroupName()).thenReturn(TEST_GROUP_NAME); when(node.getHelperHostPort()).thenReturn(TEST_NODE_HELPER_HOST_PORT); - when(node.getDurability()).thenReturn(TEST_DURABILITY); - when(node.isCoalescingSync()).thenReturn(TEST_COALESCING_SYNC); Map<String, String> repConfig = new HashMap<String, String>(); repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s"); |
