diff options
| author | Alex Rudyy <orudyy@apache.org> | 2014-07-21 15:42:32 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2014-07-21 15:42:32 +0000 |
| commit | c940f617018832135605f59645f2f85e7b9dca39 (patch) | |
| tree | 05e3ec9add61ddd01a4fa32d7cc339a7d145af92 /qpid/java/bdbstore/src | |
| parent | 47893288853bd36080ac30c73e63599845ac58f0 (diff) | |
| download | qpid-python-c940f617018832135605f59645f2f85e7b9dca39.tar.gz | |
QPID-5909: Allow setting of BDB HA message store durability many times
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1612322 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
2 files changed, 25 insertions, 19 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index c1a851c7a2..e7874c317a 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -163,13 +163,13 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>(); private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>(); private final Durability _defaultDurability; - private final AtomicReference<Durability> _messageStoreDurability = new AtomicReference<Durability>(); private volatile Durability _realMessageStoreDurability = null; private volatile CoalescingCommiter _coalescingCommiter = null; private volatile ReplicatedEnvironment _environment; private volatile long _joinTime; private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState; + private volatile Durability _messageStoreDurability; private final ConcurrentHashMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>(); private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>(); @@ -208,7 +208,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan @Override public Transaction beginTransaction() { - if (_messageStoreDurability.get() == null) + if (_messageStoreDurability == null) { throw new IllegalStateException("Message store durability is not set"); } @@ -240,7 +240,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } if (_coalescingCommiter != null && _realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC - && _messageStoreDurability.get().getLocalSync() == SyncPolicy.SYNC) + && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC) { return _coalescingCommiter.commit(tx, syncCommit); } @@ -545,7 +545,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan public Durability getMessageStoreDurability() { - return _messageStoreDurability.get(); + return _messageStoreDurability; } public boolean isCoalescingSync() @@ -1105,10 +1105,24 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } + /** + * This method should only be invoked from configuration thread on virtual host activation. + * Otherwise, invocation of this method whilst coalescing committer is committing transactions might result in transaction aborts. + */ public void setMessageStoreDurability(SyncPolicy localTransactionSynchronizationPolicy, SyncPolicy remoteTransactionSynchronizationPolicy, ReplicaAckPolicy replicaAcknowledgmentPolicy) { - if (_messageStoreDurability.compareAndSet(null, new Durability(localTransactionSynchronizationPolicy, remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy ))) + if (_messageStoreDurability == null || localTransactionSynchronizationPolicy != _messageStoreDurability.getLocalSync() + || remoteTransactionSynchronizationPolicy != _messageStoreDurability.getReplicaSync() + || replicaAcknowledgmentPolicy != _messageStoreDurability.getReplicaAck()) { + _messageStoreDurability = new Durability(localTransactionSynchronizationPolicy, remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy); + + if (_coalescingCommiter != null) + { + _coalescingCommiter.stop(); + _coalescingCommiter = null; + } + if (localTransactionSynchronizationPolicy == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY) { localTransactionSynchronizationPolicy = SyncPolicy.NO_SYNC; @@ -1117,10 +1131,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } _realMessageStoreDurability = new Durability(localTransactionSynchronizationPolicy, remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy); } - else - { - throw new IllegalStateException("Message store durability is already set to " + _messageStoreDurability.get()); - } } public void setPermittedNodes(Collection<String> permittedNodes) diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index 355ebcb981..229c2c3f27 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -176,17 +176,13 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase new Durability(Durability.SyncPolicy.NO_SYNC, Durability.SyncPolicy.NO_SYNC, Durability.ReplicaAckPolicy.SIMPLE_MAJORITY), master.getRealMessageStoreDurability()); assertEquals("Unexpected durability", TEST_DURABILITY, master.getMessageStoreDurability()); - assertTrue("Unexpected coalescing syn", master.isCoalescingSync()); + assertTrue("Unexpected coalescing sync", master.isCoalescingSync()); - try - { - master.setMessageStoreDurability(TEST_DURABILITY.getLocalSync(), TEST_DURABILITY.getReplicaSync(), TEST_DURABILITY.getReplicaAck()); - fail("Cannot set message store durability twice"); - } - catch(IllegalStateException e) - { - // pass - } + master.setMessageStoreDurability(Durability.SyncPolicy.WRITE_NO_SYNC, Durability.SyncPolicy.SYNC, Durability.ReplicaAckPolicy.ALL); + assertEquals("Unexpected message store durability", + new Durability(Durability.SyncPolicy.WRITE_NO_SYNC, Durability.SyncPolicy.SYNC, Durability.ReplicaAckPolicy.ALL), + master.getRealMessageStoreDurability()); + assertFalse("Coalescing sync committer is still running", master.isCoalescingSync()); } public void testGetNodeState() throws Exception |
