diff options
| author | Alex Rudyy <orudyy@apache.org> | 2014-06-23 21:54:04 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2014-06-23 21:54:04 +0000 |
| commit | 882fd35c3a84e4f2f6b84d1bccf324936de3f3fe (patch) | |
| tree | d705bd89ae36cd203892ae042eb3ab09840974db /qpid/java/bdbstore/src | |
| parent | ca64819bc020aa10d8959ca1216029828fead06f (diff) | |
| download | qpid-python-882fd35c3a84e4f2f6b84d1bccf324936de3f3fe.tar.gz | |
QPID-5715: Set BDB message store durability only once on opening of virtual host. Restore original code of coalescing committer.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1604946 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
10 files changed, 114 insertions, 273 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java index 90caa85ad5..6550a9122e 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java @@ -22,9 +22,6 @@ package org.apache.qpid.server.store.berkeleydb; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; @@ -36,90 +33,55 @@ import com.sleepycat.je.Transaction; public class CoalescingCommiter implements Committer { - private final CommitTask _commitTask; - private final ExecutorService _taskExecutor; + private final CommitThread _commitThread; - public CoalescingCommiter(final String name, EnvironmentFacade environmentFacade) + public CoalescingCommiter(String name, EnvironmentFacade environmentFacade) { - _commitTask = new CommitTask(environmentFacade); - _taskExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() - { - @Override - public Thread newThread(Runnable r) - { - Thread t = new Thread(r, "Commit-Thread-" + name); - t.setDaemon(true); - return t; - } - }); + _commitThread = new CommitThread("Commit-Thread-" + name, environmentFacade); } @Override public void start() { - if (_commitTask.start()) - { - _taskExecutor.submit(_commitTask); - } + _commitThread.start(); } @Override public void stop() { - _commitTask.stop(); - } - - @Override - public void close() - { + _commitThread.close(); try { - _commitTask.close(); + _commitThread.join(); } - finally + catch (InterruptedException ie) { - _taskExecutor.shutdown(); + Thread.currentThread().interrupt(); + throw new RuntimeException("Commit thread has not shutdown", ie); } } @Override public StoreFuture commit(Transaction tx, boolean syncCommit) { - if (isStarted()) - { - BDBCommitFuture commitFuture = new BDBCommitFuture(_commitTask, tx, syncCommit); - try - { - commitFuture.commit(); - return commitFuture; - } - catch(IllegalStateException e) - { - // IllegalStateException is thrown when commit thread is stopped whilst commit is called - } - } - - return StoreFuture.IMMEDIATE_FUTURE; - } - - public boolean isStarted() - { - return !_commitTask.isStopped(); + BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); + commitFuture.commit(); + return commitFuture; } private static final class BDBCommitFuture implements StoreFuture { private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class); - private final CommitTask _commitTask; + private final CommitThread _commitThread; private final Transaction _tx; private final boolean _syncCommit; private RuntimeException _databaseException; private boolean _complete; - public BDBCommitFuture(CommitTask commitTask, Transaction tx, boolean syncCommit) + public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit) { - _commitTask = commitTask; + _commitThread = commitThread; _tx = tx; _syncCommit = syncCommit; } @@ -145,7 +107,7 @@ public class CoalescingCommiter implements Committer public void commit() throws DatabaseException { - _commitTask.addJob(this, _syncCommit); + _commitThread.addJob(this, _syncCommit); if(!_syncCommit) { @@ -180,7 +142,7 @@ public class CoalescingCommiter implements Committer while (!isComplete()) { - _commitTask.explicitNotify(); + _commitThread.explicitNotify(); try { wait(250); @@ -189,24 +151,6 @@ public class CoalescingCommiter implements Committer { throw new RuntimeException(e); } - - if (!_commitTask.isClosed() && _commitTask.isStopped() && !isComplete()) - { - // coalesing sync is not required anymore - // flush log and mark transaction as completed - try - { - _commitTask.flushLog(); - } - catch(DatabaseException e) - { - _databaseException = e; - } - finally - { - complete(); - } - } } if(LOGGER.isDebugEnabled()) @@ -218,38 +162,28 @@ public class CoalescingCommiter implements Committer } /** - * Implements a {@link Runnable} which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations + * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before * continuing, but it is the responsibility of this thread to tell the commit operations when they have been * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. * * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table> */ - private static class CommitTask implements Runnable + private static class CommitThread extends Thread { - private static final Logger LOGGER = Logger.getLogger(CommitTask.class); + private static final Logger LOGGER = Logger.getLogger(CommitThread.class); - private final AtomicBoolean _stopped = new AtomicBoolean(true); - private final AtomicBoolean _closed = new AtomicBoolean(false); + private final AtomicBoolean _stopped = new AtomicBoolean(false); private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); private final Object _lock = new Object(); private final EnvironmentFacade _environmentFacade; - public CommitTask(EnvironmentFacade environmentFacade) + public CommitThread(String name, EnvironmentFacade environmentFacade) { + super(name); _environmentFacade = environmentFacade; } - public boolean isClosed() - { - return _closed.get(); - } - - public boolean isStopped() - { - return _stopped.get(); - } - public void explicitNotify() { synchronized (_lock) @@ -258,7 +192,6 @@ public class CoalescingCommiter implements Committer } } - @Override public void run() { while (!_stopped.get()) @@ -280,12 +213,6 @@ public class CoalescingCommiter implements Committer } processJobs(); } - - // process remaining jobs if such were added whilst stopped - if (hasJobs()) - { - processJobs(); - } } private void processJobs() @@ -300,7 +227,11 @@ public class CoalescingCommiter implements Committer startTime = System.currentTimeMillis(); } - flushLog(); + Environment environment = _environmentFacade.getEnvironment(); + if (environment != null && environment.isValid()) + { + environment.flushLog(true); + } if(LOGGER.isDebugEnabled()) { @@ -351,15 +282,6 @@ public class CoalescingCommiter implements Committer } } - private void flushLog() - { - Environment environment = _environmentFacade.getEnvironment(); - if (environment != null && environment.isValid()) - { - environment.flushLog(true); - } - } - private boolean hasJobs() { return !_jobQueue.isEmpty(); @@ -381,44 +303,24 @@ public class CoalescingCommiter implements Committer } } - public boolean start() - { - return _stopped.compareAndSet(true, false); - } - - public void stop() + public void close() { - if (_stopped.compareAndSet(false, true)) + RuntimeException e = new RuntimeException("Commit thread has been closed, transaction aborted"); + synchronized (_lock) { - synchronized (_lock) + _stopped.set(true); + BDBCommitFuture commit = null; + int abortedCommits = 0; + while ((commit = _jobQueue.poll()) != null) { - _lock.notifyAll(); + abortedCommits++; + commit.abort(e); } - _jobQueue.clear(); - } - } - - public void close() - { - if (_closed.compareAndSet(false, true)) - { - RuntimeException e = new RuntimeException("Commit thread has been closed"); - synchronized (_lock) + if (LOGGER.isDebugEnabled() && abortedCommits > 0) { - _stopped.set(true); - BDBCommitFuture commit = null; - int abortedCommits = 0; - while ((commit = _jobQueue.poll()) != null) - { - abortedCommits++; - commit.abort(e); - } - if (LOGGER.isDebugEnabled() && abortedCommits > 0) - { - LOGGER.debug(abortedCommits + " commit(s) were aborted during close."); - } - _lock.notifyAll(); + LOGGER.debug(abortedCommits + " commit(s) were aborted during close."); } + _lock.notifyAll(); } } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java index bc9771f463..01e45d8ac5 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java @@ -31,8 +31,4 @@ public interface Committer StoreFuture commit(Transaction tx, boolean syncCommit); void stop(); - - void close(); - - boolean isStarted(); }
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java index 6a377babdf..c8f2250566 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java @@ -30,7 +30,6 @@ import com.sleepycat.je.Sequence; import com.sleepycat.je.SequenceConfig; import org.apache.log4j.Logger; -import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.StoreFuture; import com.sleepycat.je.Database; @@ -129,10 +128,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade { try { - if (_committer != null) - { - _committer.close(); - } + _committer.stop(); closeSequences(); closeDatabases(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index b0cac5fad9..dff5fc372d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -104,7 +104,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan static final SyncPolicy LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.SYNC; static final SyncPolicy REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.NO_SYNC; - static final ReplicaAckPolicy REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY = ReplicaAckPolicy.SIMPLE_MAJORITY; + public static final ReplicaAckPolicy REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY = ReplicaAckPolicy.SIMPLE_MAJORITY; @SuppressWarnings("serial") private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() @@ -155,13 +155,13 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>(); private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>(); private final Durability _defaultDurability; - private final CoalescingCommiter _coalescingCommiter; + private final AtomicReference<Durability> _messageStoreDurability = new AtomicReference<Durability>(); + private volatile Durability _realMessageStoreDurability = null; + private volatile CoalescingCommiter _coalescingCommiter = null; private volatile ReplicatedEnvironment _environment; private volatile long _joinTime; private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState; - private volatile SyncPolicy _messageStoreLocalTransactionSyncronizationPolicy = LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY; - private volatile SyncPolicy _messageStoreRemoteTransactionSyncronizationPolicy = REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY; private final ConcurrentHashMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>(); private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>(); @@ -190,7 +190,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan _environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName)); _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName)); - _coalescingCommiter = new CoalescingCommiter(_configuration.getGroupName(), this); // create environment in a separate thread to avoid renaming of the current thread by JE _environment = createEnvironment(true); @@ -201,10 +200,15 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan @Override public Transaction beginTransaction() { + if (_messageStoreDurability.get() == null) + { + throw new IllegalStateException("Message store durability is not set"); + } + try { TransactionConfig transactionConfig = new TransactionConfig(); - transactionConfig.setDurability(getMessageStoreTransactionDurability()); + transactionConfig.setDurability(getRealMessageStoreDurability()); return _environment.beginTransaction(null, transactionConfig); } catch(DatabaseException e) @@ -220,13 +224,19 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { // Using commit() instead of commitNoSync() for the HA store to allow // the HA durability configuration to influence resulting behaviour. - tx.commit(); + tx.commit(_realMessageStoreDurability); } catch (DatabaseException de) { throw handleDatabaseException("Got DatabaseException on commit, closing environment", de); } - return _coalescingCommiter.commit(tx, syncCommit); + + if (_coalescingCommiter != null && _realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC + && _messageStoreDurability.get().getLocalSync() == SyncPolicy.SYNC) + { + return _coalescingCommiter.commit(tx, syncCommit); + } + return StoreFuture.IMMEDIATE_FUTURE; } @Override @@ -248,7 +258,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan try { - _coalescingCommiter.close(); + if (_coalescingCommiter != null) + { + _coalescingCommiter.stop(); + } closeSequences(); closeDatabases(); } @@ -506,26 +519,19 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return (String)_configuration.getHelperHostPort(); } - Durability getMessageStoreTransactionDurability() + Durability getRealMessageStoreDurability() { - SyncPolicy localSync = getMessageStoreLocalTransactionSyncronizationPolicy(); - if ( localSync == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY && _coalescingCommiter.isStarted()) - { - localSync = SyncPolicy.NO_SYNC; - } - SyncPolicy replicaSync = getMessageStoreRemoteTransactionSyncronizationPolicy(); - return new Durability(localSync, replicaSync, getReplicaAcknowledgmentPolicy()); + return _realMessageStoreDurability; } - public Durability getDurability() + public Durability getMessageStoreDurability() { - return new Durability(getMessageStoreLocalTransactionSyncronizationPolicy(), - getMessageStoreRemoteTransactionSyncronizationPolicy(), getReplicaAcknowledgmentPolicy()); + return _messageStoreDurability.get(); } public boolean isCoalescingSync() { - return _coalescingCommiter.isStarted(); + return _coalescingCommiter != null; } public String getNodeState() @@ -1087,39 +1093,24 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - public SyncPolicy getMessageStoreLocalTransactionSyncronizationPolicy() - { - return _messageStoreLocalTransactionSyncronizationPolicy; - } - - public SyncPolicy getMessageStoreRemoteTransactionSyncronizationPolicy() - { - return _messageStoreRemoteTransactionSyncronizationPolicy; - } - - public ReplicaAckPolicy getReplicaAcknowledgmentPolicy() - { - return REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY; - } - - public void setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy localTransactionSyncronizationPolicy) + public void setMessageStoreDurability(SyncPolicy localTransactionSynchronizationPolicy, SyncPolicy remoteTransactionSynchronizationPolicy, ReplicaAckPolicy replicaAcknowledgmentPolicy) { - _messageStoreLocalTransactionSyncronizationPolicy = localTransactionSyncronizationPolicy; - if (localTransactionSyncronizationPolicy == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY) + if (_messageStoreDurability.compareAndSet(null, new Durability(localTransactionSynchronizationPolicy, remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy ))) { - _coalescingCommiter.start(); + if (localTransactionSynchronizationPolicy == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY) + { + localTransactionSynchronizationPolicy = SyncPolicy.NO_SYNC; + _coalescingCommiter = new CoalescingCommiter(_configuration.getGroupName(), this); + _coalescingCommiter.start(); + } + _realMessageStoreDurability = new Durability(localTransactionSynchronizationPolicy, remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy); } else { - _coalescingCommiter.stop(); + throw new IllegalStateException("Message store durability is already set to " + _messageStoreDurability.get()); } } - public void setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy remoteTransactionSyncronizationPolicy) - { - _messageStoreRemoteTransactionSyncronizationPolicy = remoteTransactionSyncronizationPolicy; - } - private void populateExistingRemoteReplicationNodes() { ReplicationGroup group = _environment.getGroup(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java index 7182cb10af..8cecbe172e 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java @@ -31,7 +31,7 @@ public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends Virtual String REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY = "remoteTransactionSynchronizationPolicy"; String LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = "localTransactionSynchronizationPolicy"; String COALESCING_SYNC = "coalescingSync"; - String REPLICA_ACKNOWLEDGMENT_POLICY = "replicaAcknowledgmentPolicy"; + String DURABILITY = "durability"; @ManagedAttribute( defaultValue = "SYNC") String getLocalTransactionSynchronizationPolicy(); @@ -40,8 +40,8 @@ public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends Virtual String getRemoteTransactionSynchronizationPolicy(); @DerivedAttribute - String getReplicaAcknowledgmentPolicy(); + boolean isCoalescingSync(); @DerivedAttribute - boolean isCoalescingSync(); + String getDurability(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java index 6ba0a4dd39..b45d471960 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java @@ -42,10 +42,10 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm private final BDBConfigurationStore _configurationStore; - @ManagedAttributeField(afterSet="setLocalTransactionSynchronizationPolicyOnEnvironment") + @ManagedAttributeField private String _localTransactionSynchronizationPolicy; - @ManagedAttributeField(afterSet="setRemoteTransactionSynchronizationPolicyOnEnvironment") + @ManagedAttributeField private String _remoteTransactionSynchronizationPolicy; @ManagedObjectFactoryConstructor @@ -74,14 +74,13 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm return _remoteTransactionSynchronizationPolicy; } - @Override - public String getReplicaAcknowledgmentPolicy() + public String getDurability() { ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade(); if (facade != null) { - return facade.getReplicaAcknowledgmentPolicy().name(); + return String.valueOf(facade.getMessageStoreDurability()); } return null; } @@ -89,20 +88,21 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm @Override public boolean isCoalescingSync() { - ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade(); - if (facade != null) - { - return facade.isCoalescingSync(); - } - return false; + return _localTransactionSynchronizationPolicy.equals(SyncPolicy.SYNC.name()); } @Override public void onOpen() { + ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade(); + if (facade != null) + { + facade.setMessageStoreDurability( + SyncPolicy.valueOf(getLocalTransactionSynchronizationPolicy()), + SyncPolicy.valueOf(getRemoteTransactionSynchronizationPolicy()), + ReplicatedEnvironmentFacade.REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY); + } super.onOpen(); - setRemoteTransactionSynchronizationPolicyOnEnvironment(); - setLocalTransactionSynchronizationPolicyOnEnvironment(); } @Override @@ -135,24 +135,6 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm } } - protected void setLocalTransactionSynchronizationPolicyOnEnvironment() - { - ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade(); - if (facade != null) - { - facade.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.valueOf(getLocalTransactionSynchronizationPolicy())); - } - } - - protected void setRemoteTransactionSynchronizationPolicyOnEnvironment() - { - ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade(); - if (facade != null) - { - facade.setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy.valueOf(getRemoteTransactionSynchronizationPolicy())); - } - } - private ReplicatedEnvironmentFacade getReplicatedEnvironmentFacade() { return (ReplicatedEnvironmentFacade) _configurationStore.getEnvironmentFacade(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java index b230b7520e..fbf3041054 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java @@ -48,9 +48,6 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends @ManagedAttribute(mandatory=true) String getHelperAddress(); - @DerivedAttribute - String getDurability(); - @ManagedAttribute(defaultValue = "false") boolean isDesignatedPrimary(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 90737a9385..74d273a660 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -161,17 +161,6 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } @Override - public String getDurability() - { - ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); - if (environmentFacade != null) - { - return environmentFacade.getDurability().toString(); - } - return null; - } - - @Override public boolean isDesignatedPrimary() { return _designatedPrimary; diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java index f9bdaeda93..80f6e7ea49 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java @@ -477,7 +477,9 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC"); virtualHost.setAttributes(virtualHostAttributes); - awaitForAttributeChange(virtualHost, BDBHAVirtualHostImpl.COALESCING_SYNC, false); + virtualHost.stop(); + virtualHost.start(); + assertEquals("Unexpected local transaction synchronization policy", "WRITE_NO_SYNC", virtualHost.getLocalTransactionSynchronizationPolicy()); assertEquals("Unexpected remote transaction synchronization policy", "SYNC", virtualHost.getRemoteTransactionSynchronizationPolicy()); assertFalse("CoalescingSync is not OFF", virtualHost.isCoalescingSync()); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index ec5098f369..86d47a6a8b 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -40,7 +40,6 @@ import org.apache.qpid.util.FileUtils; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.Durability; -import com.sleepycat.je.Durability.SyncPolicy; import com.sleepycat.je.Environment; import com.sleepycat.je.Transaction; import com.sleepycat.je.rep.NodeState; @@ -163,18 +162,24 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, createMaster().getHelperHostPort()); } - public void testGetDurability() throws Exception + public void testSetMessageStoreDurability() throws Exception { ReplicatedEnvironmentFacade master = createMaster(); - assertEquals("Unexpected message store durability", TEST_DURABILITY, master.getMessageStoreTransactionDurability()); - assertEquals("Unexpected durability", TEST_DURABILITY, master.getDurability()); - assertFalse("Coalescing syn before policy set to SYNC", master.isCoalescingSync()); - master.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.SYNC); - assertTrue("Coalescing syn after policy set to SYNC", master.isCoalescingSync()); - assertEquals("Unexpected message store durability after committer start", "NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", master.getMessageStoreTransactionDurability().toString()); - master.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.WRITE_NO_SYNC); - assertEquals("Unexpected message store durability after committer stop", "WRITE_NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", master.getMessageStoreTransactionDurability().toString()); - assertFalse("Coalescing syn after policy set to WRITE_NO_SYNC", master.isCoalescingSync()); + assertEquals("Unexpected message store durability", + new Durability(Durability.SyncPolicy.NO_SYNC, Durability.SyncPolicy.NO_SYNC, Durability.ReplicaAckPolicy.SIMPLE_MAJORITY), + master.getRealMessageStoreDurability()); + assertEquals("Unexpected durability", TEST_DURABILITY, master.getMessageStoreDurability()); + assertTrue("Unexpected coalescing syn", master.isCoalescingSync()); + + try + { + master.setMessageStoreDurability(TEST_DURABILITY.getLocalSync(), TEST_DURABILITY.getReplicaSync(), TEST_DURABILITY.getReplicaAck()); + fail("Cannot set message store durability twice"); + } + catch(IllegalStateException e) + { + // pass + } } public void testGetNodeState() throws Exception @@ -619,26 +624,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState()); } - public void testSetLocalTransactionSyncronizationPolicy() throws Exception - { - ReplicatedEnvironmentFacade facade = createMaster(); - assertEquals("Unexpected local transaction synchronization policy before change", - ReplicatedEnvironmentFacade.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getMessageStoreLocalTransactionSyncronizationPolicy()); - facade.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.WRITE_NO_SYNC); - assertEquals("Unexpected local transaction synchronization policy after change", - SyncPolicy.WRITE_NO_SYNC, facade.getMessageStoreLocalTransactionSyncronizationPolicy()); - } - - public void testSetRemoteTransactionSyncronizationPolicy() throws Exception - { - ReplicatedEnvironmentFacade facade = createMaster(); - assertEquals("Unexpected remote transaction synchronization policy before change", - ReplicatedEnvironmentFacade.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getMessageStoreRemoteTransactionSyncronizationPolicy()); - facade.setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy.WRITE_NO_SYNC); - assertEquals("Unexpected remote transaction synchronization policy after change", - SyncPolicy.WRITE_NO_SYNC, facade.getMessageStoreRemoteTransactionSyncronizationPolicy()); - } - public void testBeginTransaction() throws Exception { ReplicatedEnvironmentFacade facade = createMaster(); @@ -696,6 +681,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config); ref.setStateChangeListener(stateChangeListener); ref.setReplicationGroupListener(replicationGroupListener); + ref.setMessageStoreDurability(TEST_DURABILITY.getLocalSync(), TEST_DURABILITY.getReplicaSync(), TEST_DURABILITY.getReplicaAck()); _nodes.put(nodeName, ref); return ref; } |
