summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-07-21 15:42:32 +0000
committerAlex Rudyy <orudyy@apache.org>2014-07-21 15:42:32 +0000
commitc940f617018832135605f59645f2f85e7b9dca39 (patch)
tree05e3ec9add61ddd01a4fa32d7cc339a7d145af92 /qpid/java/bdbstore/src
parent47893288853bd36080ac30c73e63599845ac58f0 (diff)
downloadqpid-python-c940f617018832135605f59645f2f85e7b9dca39.tar.gz
QPID-5909: Allow setting of BDB HA message store durability many times
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1612322 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java28
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java16
2 files changed, 25 insertions, 19 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index c1a851c7a2..e7874c317a 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -163,13 +163,13 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
private final Durability _defaultDurability;
- private final AtomicReference<Durability> _messageStoreDurability = new AtomicReference<Durability>();
private volatile Durability _realMessageStoreDurability = null;
private volatile CoalescingCommiter _coalescingCommiter = null;
private volatile ReplicatedEnvironment _environment;
private volatile long _joinTime;
private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
+ private volatile Durability _messageStoreDurability;
private final ConcurrentHashMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>();
@@ -208,7 +208,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
@Override
public Transaction beginTransaction()
{
- if (_messageStoreDurability.get() == null)
+ if (_messageStoreDurability == null)
{
throw new IllegalStateException("Message store durability is not set");
}
@@ -240,7 +240,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
if (_coalescingCommiter != null && _realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC
- && _messageStoreDurability.get().getLocalSync() == SyncPolicy.SYNC)
+ && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
{
return _coalescingCommiter.commit(tx, syncCommit);
}
@@ -545,7 +545,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
public Durability getMessageStoreDurability()
{
- return _messageStoreDurability.get();
+ return _messageStoreDurability;
}
public boolean isCoalescingSync()
@@ -1105,10 +1105,24 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
+ /**
+ * This method should only be invoked from configuration thread on virtual host activation.
+ * Otherwise, invocation of this method whilst coalescing committer is committing transactions might result in transaction aborts.
+ */
public void setMessageStoreDurability(SyncPolicy localTransactionSynchronizationPolicy, SyncPolicy remoteTransactionSynchronizationPolicy, ReplicaAckPolicy replicaAcknowledgmentPolicy)
{
- if (_messageStoreDurability.compareAndSet(null, new Durability(localTransactionSynchronizationPolicy, remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy )))
+ if (_messageStoreDurability == null || localTransactionSynchronizationPolicy != _messageStoreDurability.getLocalSync()
+ || remoteTransactionSynchronizationPolicy != _messageStoreDurability.getReplicaSync()
+ || replicaAcknowledgmentPolicy != _messageStoreDurability.getReplicaAck())
{
+ _messageStoreDurability = new Durability(localTransactionSynchronizationPolicy, remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy);
+
+ if (_coalescingCommiter != null)
+ {
+ _coalescingCommiter.stop();
+ _coalescingCommiter = null;
+ }
+
if (localTransactionSynchronizationPolicy == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)
{
localTransactionSynchronizationPolicy = SyncPolicy.NO_SYNC;
@@ -1117,10 +1131,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
_realMessageStoreDurability = new Durability(localTransactionSynchronizationPolicy, remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy);
}
- else
- {
- throw new IllegalStateException("Message store durability is already set to " + _messageStoreDurability.get());
- }
}
public void setPermittedNodes(Collection<String> permittedNodes)
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index 355ebcb981..229c2c3f27 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -176,17 +176,13 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
new Durability(Durability.SyncPolicy.NO_SYNC, Durability.SyncPolicy.NO_SYNC, Durability.ReplicaAckPolicy.SIMPLE_MAJORITY),
master.getRealMessageStoreDurability());
assertEquals("Unexpected durability", TEST_DURABILITY, master.getMessageStoreDurability());
- assertTrue("Unexpected coalescing syn", master.isCoalescingSync());
+ assertTrue("Unexpected coalescing sync", master.isCoalescingSync());
- try
- {
- master.setMessageStoreDurability(TEST_DURABILITY.getLocalSync(), TEST_DURABILITY.getReplicaSync(), TEST_DURABILITY.getReplicaAck());
- fail("Cannot set message store durability twice");
- }
- catch(IllegalStateException e)
- {
- // pass
- }
+ master.setMessageStoreDurability(Durability.SyncPolicy.WRITE_NO_SYNC, Durability.SyncPolicy.SYNC, Durability.ReplicaAckPolicy.ALL);
+ assertEquals("Unexpected message store durability",
+ new Durability(Durability.SyncPolicy.WRITE_NO_SYNC, Durability.SyncPolicy.SYNC, Durability.ReplicaAckPolicy.ALL),
+ master.getRealMessageStoreDurability());
+ assertFalse("Coalescing sync committer is still running", master.isCoalescingSync());
}
public void testGetNodeState() throws Exception