summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-06-23 21:54:04 +0000
committerAlex Rudyy <orudyy@apache.org>2014-06-23 21:54:04 +0000
commit882fd35c3a84e4f2f6b84d1bccf324936de3f3fe (patch)
treed705bd89ae36cd203892ae042eb3ab09840974db /qpid/java/bdbstore/src
parentca64819bc020aa10d8959ca1216029828fead06f (diff)
downloadqpid-python-882fd35c3a84e4f2f6b84d1bccf324936de3f3fe.tar.gz
QPID-5715: Set BDB message store durability only once on opening of virtual host. Restore original code of coalescing committer.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1604946 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java178
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java4
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java83
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java44
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java3
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java11
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java4
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java48
10 files changed, 114 insertions, 273 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
index 90caa85ad5..6550a9122e 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
@@ -22,9 +22,6 @@ package org.apache.qpid.server.store.berkeleydb;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
@@ -36,90 +33,55 @@ import com.sleepycat.je.Transaction;
public class CoalescingCommiter implements Committer
{
- private final CommitTask _commitTask;
- private final ExecutorService _taskExecutor;
+ private final CommitThread _commitThread;
- public CoalescingCommiter(final String name, EnvironmentFacade environmentFacade)
+ public CoalescingCommiter(String name, EnvironmentFacade environmentFacade)
{
- _commitTask = new CommitTask(environmentFacade);
- _taskExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
- {
- @Override
- public Thread newThread(Runnable r)
- {
- Thread t = new Thread(r, "Commit-Thread-" + name);
- t.setDaemon(true);
- return t;
- }
- });
+ _commitThread = new CommitThread("Commit-Thread-" + name, environmentFacade);
}
@Override
public void start()
{
- if (_commitTask.start())
- {
- _taskExecutor.submit(_commitTask);
- }
+ _commitThread.start();
}
@Override
public void stop()
{
- _commitTask.stop();
- }
-
- @Override
- public void close()
- {
+ _commitThread.close();
try
{
- _commitTask.close();
+ _commitThread.join();
}
- finally
+ catch (InterruptedException ie)
{
- _taskExecutor.shutdown();
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Commit thread has not shutdown", ie);
}
}
@Override
public StoreFuture commit(Transaction tx, boolean syncCommit)
{
- if (isStarted())
- {
- BDBCommitFuture commitFuture = new BDBCommitFuture(_commitTask, tx, syncCommit);
- try
- {
- commitFuture.commit();
- return commitFuture;
- }
- catch(IllegalStateException e)
- {
- // IllegalStateException is thrown when commit thread is stopped whilst commit is called
- }
- }
-
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- public boolean isStarted()
- {
- return !_commitTask.isStopped();
+ BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+ commitFuture.commit();
+ return commitFuture;
}
private static final class BDBCommitFuture implements StoreFuture
{
private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class);
- private final CommitTask _commitTask;
+ private final CommitThread _commitThread;
private final Transaction _tx;
private final boolean _syncCommit;
private RuntimeException _databaseException;
private boolean _complete;
- public BDBCommitFuture(CommitTask commitTask, Transaction tx, boolean syncCommit)
+ public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
{
- _commitTask = commitTask;
+ _commitThread = commitThread;
_tx = tx;
_syncCommit = syncCommit;
}
@@ -145,7 +107,7 @@ public class CoalescingCommiter implements Committer
public void commit() throws DatabaseException
{
- _commitTask.addJob(this, _syncCommit);
+ _commitThread.addJob(this, _syncCommit);
if(!_syncCommit)
{
@@ -180,7 +142,7 @@ public class CoalescingCommiter implements Committer
while (!isComplete())
{
- _commitTask.explicitNotify();
+ _commitThread.explicitNotify();
try
{
wait(250);
@@ -189,24 +151,6 @@ public class CoalescingCommiter implements Committer
{
throw new RuntimeException(e);
}
-
- if (!_commitTask.isClosed() && _commitTask.isStopped() && !isComplete())
- {
- // coalesing sync is not required anymore
- // flush log and mark transaction as completed
- try
- {
- _commitTask.flushLog();
- }
- catch(DatabaseException e)
- {
- _databaseException = e;
- }
- finally
- {
- complete();
- }
- }
}
if(LOGGER.isDebugEnabled())
@@ -218,38 +162,28 @@ public class CoalescingCommiter implements Committer
}
/**
- * Implements a {@link Runnable} which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
+ * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
* themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
* continuing, but it is the responsibility of this thread to tell the commit operations when they have been
* completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
*
* <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
*/
- private static class CommitTask implements Runnable
+ private static class CommitThread extends Thread
{
- private static final Logger LOGGER = Logger.getLogger(CommitTask.class);
+ private static final Logger LOGGER = Logger.getLogger(CommitThread.class);
- private final AtomicBoolean _stopped = new AtomicBoolean(true);
- private final AtomicBoolean _closed = new AtomicBoolean(false);
+ private final AtomicBoolean _stopped = new AtomicBoolean(false);
private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
private final Object _lock = new Object();
private final EnvironmentFacade _environmentFacade;
- public CommitTask(EnvironmentFacade environmentFacade)
+ public CommitThread(String name, EnvironmentFacade environmentFacade)
{
+ super(name);
_environmentFacade = environmentFacade;
}
- public boolean isClosed()
- {
- return _closed.get();
- }
-
- public boolean isStopped()
- {
- return _stopped.get();
- }
-
public void explicitNotify()
{
synchronized (_lock)
@@ -258,7 +192,6 @@ public class CoalescingCommiter implements Committer
}
}
- @Override
public void run()
{
while (!_stopped.get())
@@ -280,12 +213,6 @@ public class CoalescingCommiter implements Committer
}
processJobs();
}
-
- // process remaining jobs if such were added whilst stopped
- if (hasJobs())
- {
- processJobs();
- }
}
private void processJobs()
@@ -300,7 +227,11 @@ public class CoalescingCommiter implements Committer
startTime = System.currentTimeMillis();
}
- flushLog();
+ Environment environment = _environmentFacade.getEnvironment();
+ if (environment != null && environment.isValid())
+ {
+ environment.flushLog(true);
+ }
if(LOGGER.isDebugEnabled())
{
@@ -351,15 +282,6 @@ public class CoalescingCommiter implements Committer
}
}
- private void flushLog()
- {
- Environment environment = _environmentFacade.getEnvironment();
- if (environment != null && environment.isValid())
- {
- environment.flushLog(true);
- }
- }
-
private boolean hasJobs()
{
return !_jobQueue.isEmpty();
@@ -381,44 +303,24 @@ public class CoalescingCommiter implements Committer
}
}
- public boolean start()
- {
- return _stopped.compareAndSet(true, false);
- }
-
- public void stop()
+ public void close()
{
- if (_stopped.compareAndSet(false, true))
+ RuntimeException e = new RuntimeException("Commit thread has been closed, transaction aborted");
+ synchronized (_lock)
{
- synchronized (_lock)
+ _stopped.set(true);
+ BDBCommitFuture commit = null;
+ int abortedCommits = 0;
+ while ((commit = _jobQueue.poll()) != null)
{
- _lock.notifyAll();
+ abortedCommits++;
+ commit.abort(e);
}
- _jobQueue.clear();
- }
- }
-
- public void close()
- {
- if (_closed.compareAndSet(false, true))
- {
- RuntimeException e = new RuntimeException("Commit thread has been closed");
- synchronized (_lock)
+ if (LOGGER.isDebugEnabled() && abortedCommits > 0)
{
- _stopped.set(true);
- BDBCommitFuture commit = null;
- int abortedCommits = 0;
- while ((commit = _jobQueue.poll()) != null)
- {
- abortedCommits++;
- commit.abort(e);
- }
- if (LOGGER.isDebugEnabled() && abortedCommits > 0)
- {
- LOGGER.debug(abortedCommits + " commit(s) were aborted during close.");
- }
- _lock.notifyAll();
+ LOGGER.debug(abortedCommits + " commit(s) were aborted during close.");
}
+ _lock.notifyAll();
}
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
index bc9771f463..01e45d8ac5 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
@@ -31,8 +31,4 @@ public interface Committer
StoreFuture commit(Transaction tx, boolean syncCommit);
void stop();
-
- void close();
-
- boolean isStarted();
} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index 6a377babdf..c8f2250566 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -30,7 +30,6 @@ import com.sleepycat.je.Sequence;
import com.sleepycat.je.SequenceConfig;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.StoreFuture;
import com.sleepycat.je.Database;
@@ -129,10 +128,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
{
try
{
- if (_committer != null)
- {
- _committer.close();
- }
+ _committer.stop();
closeSequences();
closeDatabases();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index b0cac5fad9..dff5fc372d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -104,7 +104,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
static final SyncPolicy LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.SYNC;
static final SyncPolicy REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.NO_SYNC;
- static final ReplicaAckPolicy REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY = ReplicaAckPolicy.SIMPLE_MAJORITY;
+ public static final ReplicaAckPolicy REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY = ReplicaAckPolicy.SIMPLE_MAJORITY;
@SuppressWarnings("serial")
private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
@@ -155,13 +155,13 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
private final Durability _defaultDurability;
- private final CoalescingCommiter _coalescingCommiter;
+ private final AtomicReference<Durability> _messageStoreDurability = new AtomicReference<Durability>();
+ private volatile Durability _realMessageStoreDurability = null;
+ private volatile CoalescingCommiter _coalescingCommiter = null;
private volatile ReplicatedEnvironment _environment;
private volatile long _joinTime;
private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
- private volatile SyncPolicy _messageStoreLocalTransactionSyncronizationPolicy = LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY;
- private volatile SyncPolicy _messageStoreRemoteTransactionSyncronizationPolicy = REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY;
private final ConcurrentHashMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>();
@@ -190,7 +190,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
_environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName));
_groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
- _coalescingCommiter = new CoalescingCommiter(_configuration.getGroupName(), this);
// create environment in a separate thread to avoid renaming of the current thread by JE
_environment = createEnvironment(true);
@@ -201,10 +200,15 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
@Override
public Transaction beginTransaction()
{
+ if (_messageStoreDurability.get() == null)
+ {
+ throw new IllegalStateException("Message store durability is not set");
+ }
+
try
{
TransactionConfig transactionConfig = new TransactionConfig();
- transactionConfig.setDurability(getMessageStoreTransactionDurability());
+ transactionConfig.setDurability(getRealMessageStoreDurability());
return _environment.beginTransaction(null, transactionConfig);
}
catch(DatabaseException e)
@@ -220,13 +224,19 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
// Using commit() instead of commitNoSync() for the HA store to allow
// the HA durability configuration to influence resulting behaviour.
- tx.commit();
+ tx.commit(_realMessageStoreDurability);
}
catch (DatabaseException de)
{
throw handleDatabaseException("Got DatabaseException on commit, closing environment", de);
}
- return _coalescingCommiter.commit(tx, syncCommit);
+
+ if (_coalescingCommiter != null && _realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC
+ && _messageStoreDurability.get().getLocalSync() == SyncPolicy.SYNC)
+ {
+ return _coalescingCommiter.commit(tx, syncCommit);
+ }
+ return StoreFuture.IMMEDIATE_FUTURE;
}
@Override
@@ -248,7 +258,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
try
{
- _coalescingCommiter.close();
+ if (_coalescingCommiter != null)
+ {
+ _coalescingCommiter.stop();
+ }
closeSequences();
closeDatabases();
}
@@ -506,26 +519,19 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return (String)_configuration.getHelperHostPort();
}
- Durability getMessageStoreTransactionDurability()
+ Durability getRealMessageStoreDurability()
{
- SyncPolicy localSync = getMessageStoreLocalTransactionSyncronizationPolicy();
- if ( localSync == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY && _coalescingCommiter.isStarted())
- {
- localSync = SyncPolicy.NO_SYNC;
- }
- SyncPolicy replicaSync = getMessageStoreRemoteTransactionSyncronizationPolicy();
- return new Durability(localSync, replicaSync, getReplicaAcknowledgmentPolicy());
+ return _realMessageStoreDurability;
}
- public Durability getDurability()
+ public Durability getMessageStoreDurability()
{
- return new Durability(getMessageStoreLocalTransactionSyncronizationPolicy(),
- getMessageStoreRemoteTransactionSyncronizationPolicy(), getReplicaAcknowledgmentPolicy());
+ return _messageStoreDurability.get();
}
public boolean isCoalescingSync()
{
- return _coalescingCommiter.isStarted();
+ return _coalescingCommiter != null;
}
public String getNodeState()
@@ -1087,39 +1093,24 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
- public SyncPolicy getMessageStoreLocalTransactionSyncronizationPolicy()
- {
- return _messageStoreLocalTransactionSyncronizationPolicy;
- }
-
- public SyncPolicy getMessageStoreRemoteTransactionSyncronizationPolicy()
- {
- return _messageStoreRemoteTransactionSyncronizationPolicy;
- }
-
- public ReplicaAckPolicy getReplicaAcknowledgmentPolicy()
- {
- return REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY;
- }
-
- public void setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy localTransactionSyncronizationPolicy)
+ public void setMessageStoreDurability(SyncPolicy localTransactionSynchronizationPolicy, SyncPolicy remoteTransactionSynchronizationPolicy, ReplicaAckPolicy replicaAcknowledgmentPolicy)
{
- _messageStoreLocalTransactionSyncronizationPolicy = localTransactionSyncronizationPolicy;
- if (localTransactionSyncronizationPolicy == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)
+ if (_messageStoreDurability.compareAndSet(null, new Durability(localTransactionSynchronizationPolicy, remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy )))
{
- _coalescingCommiter.start();
+ if (localTransactionSynchronizationPolicy == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)
+ {
+ localTransactionSynchronizationPolicy = SyncPolicy.NO_SYNC;
+ _coalescingCommiter = new CoalescingCommiter(_configuration.getGroupName(), this);
+ _coalescingCommiter.start();
+ }
+ _realMessageStoreDurability = new Durability(localTransactionSynchronizationPolicy, remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy);
}
else
{
- _coalescingCommiter.stop();
+ throw new IllegalStateException("Message store durability is already set to " + _messageStoreDurability.get());
}
}
- public void setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy remoteTransactionSyncronizationPolicy)
- {
- _messageStoreRemoteTransactionSyncronizationPolicy = remoteTransactionSyncronizationPolicy;
- }
-
private void populateExistingRemoteReplicationNodes()
{
ReplicationGroup group = _environment.getGroup();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
index 7182cb10af..8cecbe172e 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
@@ -31,7 +31,7 @@ public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends Virtual
String REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY = "remoteTransactionSynchronizationPolicy";
String LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = "localTransactionSynchronizationPolicy";
String COALESCING_SYNC = "coalescingSync";
- String REPLICA_ACKNOWLEDGMENT_POLICY = "replicaAcknowledgmentPolicy";
+ String DURABILITY = "durability";
@ManagedAttribute( defaultValue = "SYNC")
String getLocalTransactionSynchronizationPolicy();
@@ -40,8 +40,8 @@ public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends Virtual
String getRemoteTransactionSynchronizationPolicy();
@DerivedAttribute
- String getReplicaAcknowledgmentPolicy();
+ boolean isCoalescingSync();
@DerivedAttribute
- boolean isCoalescingSync();
+ String getDurability();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
index 6ba0a4dd39..b45d471960 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
@@ -42,10 +42,10 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm
private final BDBConfigurationStore _configurationStore;
- @ManagedAttributeField(afterSet="setLocalTransactionSynchronizationPolicyOnEnvironment")
+ @ManagedAttributeField
private String _localTransactionSynchronizationPolicy;
- @ManagedAttributeField(afterSet="setRemoteTransactionSynchronizationPolicyOnEnvironment")
+ @ManagedAttributeField
private String _remoteTransactionSynchronizationPolicy;
@ManagedObjectFactoryConstructor
@@ -74,14 +74,13 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm
return _remoteTransactionSynchronizationPolicy;
}
-
@Override
- public String getReplicaAcknowledgmentPolicy()
+ public String getDurability()
{
ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
if (facade != null)
{
- return facade.getReplicaAcknowledgmentPolicy().name();
+ return String.valueOf(facade.getMessageStoreDurability());
}
return null;
}
@@ -89,20 +88,21 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm
@Override
public boolean isCoalescingSync()
{
- ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
- if (facade != null)
- {
- return facade.isCoalescingSync();
- }
- return false;
+ return _localTransactionSynchronizationPolicy.equals(SyncPolicy.SYNC.name());
}
@Override
public void onOpen()
{
+ ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
+ if (facade != null)
+ {
+ facade.setMessageStoreDurability(
+ SyncPolicy.valueOf(getLocalTransactionSynchronizationPolicy()),
+ SyncPolicy.valueOf(getRemoteTransactionSynchronizationPolicy()),
+ ReplicatedEnvironmentFacade.REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY);
+ }
super.onOpen();
- setRemoteTransactionSynchronizationPolicyOnEnvironment();
- setLocalTransactionSynchronizationPolicyOnEnvironment();
}
@Override
@@ -135,24 +135,6 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm
}
}
- protected void setLocalTransactionSynchronizationPolicyOnEnvironment()
- {
- ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
- if (facade != null)
- {
- facade.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.valueOf(getLocalTransactionSynchronizationPolicy()));
- }
- }
-
- protected void setRemoteTransactionSynchronizationPolicyOnEnvironment()
- {
- ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
- if (facade != null)
- {
- facade.setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy.valueOf(getRemoteTransactionSynchronizationPolicy()));
- }
- }
-
private ReplicatedEnvironmentFacade getReplicatedEnvironmentFacade()
{
return (ReplicatedEnvironmentFacade) _configurationStore.getEnvironmentFacade();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
index b230b7520e..fbf3041054 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
@@ -48,9 +48,6 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends
@ManagedAttribute(mandatory=true)
String getHelperAddress();
- @DerivedAttribute
- String getDurability();
-
@ManagedAttribute(defaultValue = "false")
boolean isDesignatedPrimary();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index 90737a9385..74d273a660 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -161,17 +161,6 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
@Override
- public String getDurability()
- {
- ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
- if (environmentFacade != null)
- {
- return environmentFacade.getDurability().toString();
- }
- return null;
- }
-
- @Override
public boolean isDesignatedPrimary()
{
return _designatedPrimary;
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
index f9bdaeda93..80f6e7ea49 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
@@ -477,7 +477,9 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC");
virtualHost.setAttributes(virtualHostAttributes);
- awaitForAttributeChange(virtualHost, BDBHAVirtualHostImpl.COALESCING_SYNC, false);
+ virtualHost.stop();
+ virtualHost.start();
+
assertEquals("Unexpected local transaction synchronization policy", "WRITE_NO_SYNC", virtualHost.getLocalTransactionSynchronizationPolicy());
assertEquals("Unexpected remote transaction synchronization policy", "SYNC", virtualHost.getRemoteTransactionSynchronizationPolicy());
assertFalse("CoalescingSync is not OFF", virtualHost.isCoalescingSync());
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index ec5098f369..86d47a6a8b 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -40,7 +40,6 @@ import org.apache.qpid.util.FileUtils;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.Durability;
-import com.sleepycat.je.Durability.SyncPolicy;
import com.sleepycat.je.Environment;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.rep.NodeState;
@@ -163,18 +162,24 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, createMaster().getHelperHostPort());
}
- public void testGetDurability() throws Exception
+ public void testSetMessageStoreDurability() throws Exception
{
ReplicatedEnvironmentFacade master = createMaster();
- assertEquals("Unexpected message store durability", TEST_DURABILITY, master.getMessageStoreTransactionDurability());
- assertEquals("Unexpected durability", TEST_DURABILITY, master.getDurability());
- assertFalse("Coalescing syn before policy set to SYNC", master.isCoalescingSync());
- master.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.SYNC);
- assertTrue("Coalescing syn after policy set to SYNC", master.isCoalescingSync());
- assertEquals("Unexpected message store durability after committer start", "NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", master.getMessageStoreTransactionDurability().toString());
- master.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.WRITE_NO_SYNC);
- assertEquals("Unexpected message store durability after committer stop", "WRITE_NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", master.getMessageStoreTransactionDurability().toString());
- assertFalse("Coalescing syn after policy set to WRITE_NO_SYNC", master.isCoalescingSync());
+ assertEquals("Unexpected message store durability",
+ new Durability(Durability.SyncPolicy.NO_SYNC, Durability.SyncPolicy.NO_SYNC, Durability.ReplicaAckPolicy.SIMPLE_MAJORITY),
+ master.getRealMessageStoreDurability());
+ assertEquals("Unexpected durability", TEST_DURABILITY, master.getMessageStoreDurability());
+ assertTrue("Unexpected coalescing syn", master.isCoalescingSync());
+
+ try
+ {
+ master.setMessageStoreDurability(TEST_DURABILITY.getLocalSync(), TEST_DURABILITY.getReplicaSync(), TEST_DURABILITY.getReplicaAck());
+ fail("Cannot set message store durability twice");
+ }
+ catch(IllegalStateException e)
+ {
+ // pass
+ }
}
public void testGetNodeState() throws Exception
@@ -619,26 +624,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState());
}
- public void testSetLocalTransactionSyncronizationPolicy() throws Exception
- {
- ReplicatedEnvironmentFacade facade = createMaster();
- assertEquals("Unexpected local transaction synchronization policy before change",
- ReplicatedEnvironmentFacade.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getMessageStoreLocalTransactionSyncronizationPolicy());
- facade.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.WRITE_NO_SYNC);
- assertEquals("Unexpected local transaction synchronization policy after change",
- SyncPolicy.WRITE_NO_SYNC, facade.getMessageStoreLocalTransactionSyncronizationPolicy());
- }
-
- public void testSetRemoteTransactionSyncronizationPolicy() throws Exception
- {
- ReplicatedEnvironmentFacade facade = createMaster();
- assertEquals("Unexpected remote transaction synchronization policy before change",
- ReplicatedEnvironmentFacade.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getMessageStoreRemoteTransactionSyncronizationPolicy());
- facade.setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy.WRITE_NO_SYNC);
- assertEquals("Unexpected remote transaction synchronization policy after change",
- SyncPolicy.WRITE_NO_SYNC, facade.getMessageStoreRemoteTransactionSyncronizationPolicy());
- }
-
public void testBeginTransaction() throws Exception
{
ReplicatedEnvironmentFacade facade = createMaster();
@@ -696,6 +681,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config);
ref.setStateChangeListener(stateChangeListener);
ref.setReplicationGroupListener(replicationGroupListener);
+ ref.setMessageStoreDurability(TEST_DURABILITY.getLocalSync(), TEST_DURABILITY.getReplicaSync(), TEST_DURABILITY.getReplicaAck());
_nodes.put(nodeName, ref);
return ref;
}