summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/test
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-06-03 08:25:16 +0000
committerAlex Rudyy <orudyy@apache.org>2014-06-03 08:25:16 +0000
commitf418f3407dd6e080d2c4234e75ab6813f2ace4ea (patch)
treebd7069626e803a3432812f447e08b05a801e44f0 /qpid/java/bdbstore/src/test
parent0cf0c4e013554eff08e542f6f5ce64cc062ada2e (diff)
downloadqpid-python-f418f3407dd6e080d2c4234e75ab6813f2ace4ea.tar.gz
QPID-5715: Use coalescing sync committer for message store transactions only when syncronization policy SYNC is set for local transactions in BDB HA virtual host
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1599445 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/test')
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java16
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java53
2 files changed, 51 insertions, 18 deletions
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
index 563665917c..962e19f81c 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
@@ -174,7 +174,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
assertEquals(nodeHostPort, replicationConfig.getNodeHostPort());
assertEquals(helperHostPort, replicationConfig.getHelperHosts());
- assertEquals("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", environment.getConfig().getDurability().toString());
+ assertEquals("SYNC,NO_SYNC,SIMPLE_MAJORITY", environment.getConfig().getDurability().toString());
assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
assertTrue("Virtual host child has not been added", virtualHostAddedLatch.await(30, TimeUnit.SECONDS));
@@ -465,17 +465,21 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
BDBHAVirtualHostImpl virtualHost = (BDBHAVirtualHostImpl)node.getVirtualHost();
assertNotNull("Virtual host is not created", virtualHost);
- assertEquals("Unexpected local transaction synchronization policy", "NO_SYNC", virtualHost.getLocalTransactionSyncronizationPolicy());
- assertEquals("Unexpected remote transaction synchronization policy", "NO_SYNC", virtualHost.getRemoteTransactionSyncronizationPolicy());
+ awaitForAttributeChange(virtualHost, BDBHAVirtualHostImpl.COALESCING_SYNC, true);
+
+ assertEquals("Unexpected local transaction synchronization policy", "SYNC", virtualHost.getLocalTransactionSynchronizationPolicy());
+ assertEquals("Unexpected remote transaction synchronization policy", "NO_SYNC", virtualHost.getRemoteTransactionSynchronizationPolicy());
+ assertTrue("CoalescingSync is not ON", virtualHost.isCoalescingSync());
Map<String, Object> virtualHostAttributes = new HashMap<String,Object>();
virtualHostAttributes.put(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "WRITE_NO_SYNC");
virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "SYNC");
virtualHost.setAttributes(virtualHostAttributes);
- assertEquals("Unexpected local transaction synchronization policy", "WRITE_NO_SYNC", virtualHost.getLocalTransactionSyncronizationPolicy());
- assertEquals("Unexpected remote transaction synchronization policy", "SYNC", virtualHost.getRemoteTransactionSyncronizationPolicy());
-
+ awaitForAttributeChange(virtualHost, BDBHAVirtualHostImpl.COALESCING_SYNC, false);
+ assertEquals("Unexpected local transaction synchronization policy", "WRITE_NO_SYNC", virtualHost.getLocalTransactionSynchronizationPolicy());
+ assertEquals("Unexpected remote transaction synchronization policy", "SYNC", virtualHost.getRemoteTransactionSynchronizationPolicy());
+ assertFalse("CoalescingSync is not OFF", virtualHost.isCoalescingSync());
try
{
virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "INVALID"));
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index 3660566c8c..e4f1b62954 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.qpid.server.store.berkeleydb.Committer;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.test.utils.TestFileUtils;
@@ -62,9 +63,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
private static final String TEST_NODE_NAME = "testNodeName";
private static final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT;
private static final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT;
- private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString();
+ private static final Durability TEST_DURABILITY = Durability.parse("SYNC,NO_SYNC,SIMPLE_MAJORITY");
private static final boolean TEST_DESIGNATED_PRIMARY = false;
- private static final boolean TEST_COALESCING_SYNC = true;
private static final int TEST_PRIORITY = 1;
private static final int TEST_ELECTABLE_GROUP_OVERRIDE = 0;
@@ -185,12 +185,32 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
public void testGetDurability() throws Exception
{
- assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability().toString());
+ ReplicatedEnvironmentFacade master = createMaster();
+ assertEquals("Unexpected message store durability", TEST_DURABILITY, master.getMessageStoreTransactionDurability());
+ assertEquals("Unexpected durability", TEST_DURABILITY, master.getDurability());
+ Committer committer = master.createCommitter(TEST_GROUP_NAME);
+ committer.start();
+
+ waitForCommitter(committer, true);
+
+ assertEquals("Unexpected message store durability after committer start", "NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", master.getMessageStoreTransactionDurability().toString());
+
+ committer.stop();
+ waitForCommitter(committer, false);
+ assertEquals("Unexpected message store durability after committer stop", TEST_DURABILITY, master.getMessageStoreTransactionDurability());
}
public void testIsCoalescingSync() throws Exception
{
- assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, createMaster().isCoalescingSync());
+ ReplicatedEnvironmentFacade master = createMaster();
+ assertEquals("Unexpected coalescing sync", false, master.isCoalescingSync());
+ Committer committer = master.createCommitter(TEST_GROUP_NAME);
+ committer.start();
+ waitForCommitter(committer, true);
+ assertEquals("Unexpected coalescing sync", true, master.isCoalescingSync());
+ committer.stop();
+ waitForCommitter(committer, false);
+ assertEquals("Unexpected coalescing sync", false, master.isCoalescingSync());
}
public void testGetNodeState() throws Exception
@@ -690,20 +710,20 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
{
ReplicatedEnvironmentFacade facade = createMaster();
assertEquals("Unexpected local transaction synchronization policy before change",
- ReplicatedEnvironmentFacade.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getLocalTransactionSyncronizationPolicy());
- facade.setLocalTransactionSyncronizationPolicy(SyncPolicy.WRITE_NO_SYNC);
+ ReplicatedEnvironmentFacade.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getMessageStoreLocalTransactionSyncronizationPolicy());
+ facade.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.WRITE_NO_SYNC);
assertEquals("Unexpected local transaction synchronization policy after change",
- SyncPolicy.WRITE_NO_SYNC, facade.getLocalTransactionSyncronizationPolicy());
+ SyncPolicy.WRITE_NO_SYNC, facade.getMessageStoreLocalTransactionSyncronizationPolicy());
}
public void testSetRemoteTransactionSyncronizationPolicy() throws Exception
{
ReplicatedEnvironmentFacade facade = createMaster();
assertEquals("Unexpected remote transaction synchronization policy before change",
- ReplicatedEnvironmentFacade.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getRemoteTransactionSyncronizationPolicy());
- facade.setRemoteTransactionSyncronizationPolicy(SyncPolicy.WRITE_NO_SYNC);
+ ReplicatedEnvironmentFacade.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getMessageStoreRemoteTransactionSyncronizationPolicy());
+ facade.setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy.WRITE_NO_SYNC);
assertEquals("Unexpected remote transaction synchronization policy after change",
- SyncPolicy.WRITE_NO_SYNC, facade.getRemoteTransactionSyncronizationPolicy());
+ SyncPolicy.WRITE_NO_SYNC, facade.getMessageStoreRemoteTransactionSyncronizationPolicy());
}
public void testBeginTransaction() throws Exception
@@ -780,6 +800,17 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
return dbConfig;
}
+ private void waitForCommitter(Committer committer, boolean expected) throws InterruptedException
+ {
+ int counter = 0;
+ while(committer.isStarted() != expected && counter < 100)
+ {
+ Thread.sleep(20);
+ counter++;
+ }
+ assertEquals("Committer is not in expected state", expected, committer.isStarted());
+ }
+
private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary)
{
ReplicatedEnvironmentConfiguration node = mock(ReplicatedEnvironmentConfiguration.class);
@@ -790,8 +821,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
when(node.getPriority()).thenReturn(TEST_PRIORITY);
when(node.getGroupName()).thenReturn(TEST_GROUP_NAME);
when(node.getHelperHostPort()).thenReturn(TEST_NODE_HELPER_HOST_PORT);
- when(node.getDurability()).thenReturn(TEST_DURABILITY);
- when(node.isCoalescingSync()).thenReturn(TEST_COALESCING_SYNC);
Map<String, String> repConfig = new HashMap<String, String>();
repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");