summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java31
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java131
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java32
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java36
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java13
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java4
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java4
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java8
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java40
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java16
11 files changed, 139 insertions, 182 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
index 279762eacd..50a05ef6b8 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
@@ -127,8 +127,6 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
private final EnvironmentFacadeFactory _environmentFacadeFactory;
- private volatile Committer _committer;
-
private String _storeLocation;
private final BDBMessageStore _messageStoreFacade = new BDBMessageStore();
private ConfiguredObject<?> _parent;
@@ -641,9 +639,6 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
);
_storeLocation = _environmentFacade.getStoreLocation();
}
-
- _committer = _environmentFacade.createCommitter(parent.getName());
- _committer.start();
}
}
@@ -699,22 +694,9 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
@Override
public void closeMessageStore()
{
- if (_messageStoreOpen.compareAndSet(true, false))
+ if (_messageStoreOpen.compareAndSet(true, false) && !_configurationStoreOpen.get())
{
- try
- {
- if (_committer != null)
- {
- _committer.close();
- }
- }
- finally
- {
- if (!_configurationStoreOpen.get())
- {
- closeEnvironment();
- }
- }
+ closeEnvironment();
}
}
@@ -908,8 +890,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
LOGGER.debug("Deleted content for message " + messageId);
}
- _environmentFacade.commit(tx);
- _committer.commit(tx, sync);
+ _environmentFacade.commit(tx, sync);
complete = true;
tx = null;
@@ -1334,8 +1315,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
throw new StoreException("Fatal internal error: transactional is null at commitTran");
}
- _environmentFacade.commit(tx);
- StoreFuture result = _committer.commit(tx, syncCommit);
+ StoreFuture result = _environmentFacade.commit(tx, syncCommit);
if (LOGGER.isDebugEnabled())
{
@@ -1617,8 +1597,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
throw _environmentFacade.handleDatabaseException("failed to begin transaction", e);
}
store(txn);
- _environmentFacade.commit(txn);
- _committer.commit(txn, true);
+ _environmentFacade.commit(txn, true);
storedSizeChangeOccurred(getMetaData().getContentSize());
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
index c76f4ff2db..90caa85ad5 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.StoreFuture;
@@ -38,12 +37,10 @@ import com.sleepycat.je.Transaction;
public class CoalescingCommiter implements Committer
{
private final CommitTask _commitTask;
- private final AtomicReference<Boolean> _started;
private final ExecutorService _taskExecutor;
public CoalescingCommiter(final String name, EnvironmentFacade environmentFacade)
{
- _started = new AtomicReference<Boolean>(false);
_commitTask = new CommitTask(environmentFacade);
_taskExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
{
@@ -60,7 +57,7 @@ public class CoalescingCommiter implements Committer
@Override
public void start()
{
- if (_started.compareAndSet(false, true))
+ if (_commitTask.start())
{
_taskExecutor.submit(_commitTask);
}
@@ -69,10 +66,7 @@ public class CoalescingCommiter implements Committer
@Override
public void stop()
{
- if (_started.compareAndSet(true, false))
- {
- _commitTask.stop();
- }
+ _commitTask.stop();
}
@Override
@@ -80,7 +74,6 @@ public class CoalescingCommiter implements Committer
{
try
{
- _started.set(false);
_commitTask.close();
}
finally
@@ -92,7 +85,7 @@ public class CoalescingCommiter implements Committer
@Override
public StoreFuture commit(Transaction tx, boolean syncCommit)
{
- if (_started.get())
+ if (isStarted())
{
BDBCommitFuture commitFuture = new BDBCommitFuture(_commitTask, tx, syncCommit);
try
@@ -111,7 +104,7 @@ public class CoalescingCommiter implements Committer
public boolean isStarted()
{
- return _started.get();
+ return !_commitTask.isStopped();
}
private static final class BDBCommitFuture implements StoreFuture
@@ -187,11 +180,6 @@ public class CoalescingCommiter implements Committer
while (!isComplete())
{
- if (_commitTask.isStopped())
- {
- throw new IllegalStateException("Commit thread is stopped");
- }
-
_commitTask.explicitNotify();
try
{
@@ -201,6 +189,24 @@ public class CoalescingCommiter implements Committer
{
throw new RuntimeException(e);
}
+
+ if (!_commitTask.isClosed() && _commitTask.isStopped() && !isComplete())
+ {
+ // coalesing sync is not required anymore
+ // flush log and mark transaction as completed
+ try
+ {
+ _commitTask.flushLog();
+ }
+ catch(DatabaseException e)
+ {
+ _databaseException = e;
+ }
+ finally
+ {
+ complete();
+ }
+ }
}
if(LOGGER.isDebugEnabled())
@@ -224,6 +230,7 @@ public class CoalescingCommiter implements Committer
private static final Logger LOGGER = Logger.getLogger(CommitTask.class);
private final AtomicBoolean _stopped = new AtomicBoolean(true);
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
private final Object _lock = new Object();
private final EnvironmentFacade _environmentFacade;
@@ -233,6 +240,11 @@ public class CoalescingCommiter implements Committer
_environmentFacade = environmentFacade;
}
+ public boolean isClosed()
+ {
+ return _closed.get();
+ }
+
public boolean isStopped()
{
return _stopped.get();
@@ -249,27 +261,30 @@ public class CoalescingCommiter implements Committer
@Override
public void run()
{
- if (_stopped.compareAndSet(true, false))
+ while (!_stopped.get())
{
- while (!_stopped.get())
+ synchronized (_lock)
{
- synchronized (_lock)
+ while (!_stopped.get() && !hasJobs())
{
- while (!_stopped.get() && !hasJobs())
+ try
+ {
+ // Periodically wake up and check, just in case we
+ // missed a notification. Don't want to lock the broker hard.
+ _lock.wait(1000);
+ }
+ catch (InterruptedException e)
{
- try
- {
- // Periodically wake up and check, just in case we
- // missed a notification. Don't want to lock the broker hard.
- _lock.wait(1000);
- }
- catch (InterruptedException e)
- {
- }
}
}
- processJobs();
}
+ processJobs();
+ }
+
+ // process remaining jobs if such were added whilst stopped
+ if (hasJobs())
+ {
+ processJobs();
}
}
@@ -285,11 +300,7 @@ public class CoalescingCommiter implements Committer
startTime = System.currentTimeMillis();
}
- Environment environment = _environmentFacade.getEnvironment();
- if (environment != null && environment.isValid())
- {
- environment.flushLog(true);
- }
+ flushLog();
if(LOGGER.isDebugEnabled())
{
@@ -340,6 +351,15 @@ public class CoalescingCommiter implements Committer
}
}
+ private void flushLog()
+ {
+ Environment environment = _environmentFacade.getEnvironment();
+ if (environment != null && environment.isValid())
+ {
+ environment.flushLog(true);
+ }
+ }
+
private boolean hasJobs()
{
return !_jobQueue.isEmpty();
@@ -361,35 +381,44 @@ public class CoalescingCommiter implements Committer
}
}
+ public boolean start()
+ {
+ return _stopped.compareAndSet(true, false);
+ }
+
public void stop()
{
- synchronized (_lock)
+ if (_stopped.compareAndSet(false, true))
{
- if (_stopped.compareAndSet(false, true) || hasJobs())
+ synchronized (_lock)
{
- processJobs();
+ _lock.notifyAll();
}
+ _jobQueue.clear();
}
}
public void close()
{
- RuntimeException e = new RuntimeException("Commit thread has been stopped");
- synchronized (_lock)
+ if (_closed.compareAndSet(false, true))
{
- _stopped.set(true);
- BDBCommitFuture commit = null;
- int abortedCommits = 0;
- while ((commit = _jobQueue.poll()) != null)
- {
- abortedCommits++;
- commit.abort(e);
- }
- if (LOGGER.isDebugEnabled() && abortedCommits > 0)
+ RuntimeException e = new RuntimeException("Commit thread has been closed");
+ synchronized (_lock)
{
- LOGGER.debug(abortedCommits + " commit(s) were aborted during close.");
+ _stopped.set(true);
+ BDBCommitFuture commit = null;
+ int abortedCommits = 0;
+ while ((commit = _jobQueue.poll()) != null)
+ {
+ abortedCommits++;
+ commit.abort(e);
+ }
+ if (LOGGER.isDebugEnabled() && abortedCommits > 0)
+ {
+ LOGGER.debug(abortedCommits + " commit(s) were aborted during close.");
+ }
+ _lock.notifyAll();
}
- _lock.notifyAll();
}
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
index d1362ed89c..bc9771f463 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
@@ -35,36 +35,4 @@ public interface Committer
void close();
boolean isStarted();
-
- Committer IMMEDIATE_FUTURE_COMMITTER = new Committer()
- {
-
- @Override
- public void start()
- {
- }
-
- @Override
- public StoreFuture commit(Transaction tx, boolean syncCommit)
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
- public void stop()
- {
- }
-
- @Override
- public void close()
- {
- }
-
- @Override
- public boolean isStarted()
- {
- return true;
- }
- };
-
} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
index 8e62b4c476..3c8fe00d01 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
@@ -24,6 +24,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import org.apache.qpid.server.store.StoreFuture;
+
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
@@ -50,11 +52,9 @@ public interface EnvironmentFacade
Sequence openSequence(Database database, DatabaseEntry sequenceKey, SequenceConfig sequenceConfig);
- Committer createCommitter(String name);
-
Transaction beginTransaction();
- void commit(com.sleepycat.je.Transaction tx);
+ StoreFuture commit(com.sleepycat.je.Transaction tx, boolean sync);
DatabaseException handleDatabaseException(String contextMessage, DatabaseException e);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index ee7ea79e8e..f2690a5aa1 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -21,15 +21,16 @@
package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
-import java.net.ServerSocket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.Sequence;
import com.sleepycat.je.SequenceConfig;
-import com.sun.org.apache.xerces.internal.dom.DeepNodeListImpl;
+
import org.apache.log4j.Logger;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.StoreFuture;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
@@ -48,6 +49,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>();
private Environment _environment;
+ private final Committer _committer;
public StandardEnvironmentFacade(String storePath,
Map<String, String> attributes)
@@ -69,6 +71,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
}
}
+ String name = (String)attributes.get(ConfiguredObject.NAME);
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
envConfig.setTransactional(true);
@@ -82,6 +85,9 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
_environment = new Environment(environmentPath, envConfig);
+
+ _committer = new CoalescingCommiter(name, this);
+ _committer.start();
}
@@ -92,7 +98,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
}
@Override
- public void commit(com.sleepycat.je.Transaction tx)
+ public StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit)
{
try
{
@@ -106,14 +112,26 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
throw handleDatabaseException("Got DatabaseException on commit", de);
}
+ return _committer.commit(tx, syncCommit);
}
@Override
public void close()
{
- closeSequences();
- closeDatabases();
- closeEnvironment();
+ try
+ {
+ if (_committer != null)
+ {
+ _committer.close();
+ }
+
+ closeSequences();
+ closeDatabases();
+ }
+ finally
+ {
+ closeEnvironment();
+ }
}
private void closeSequences()
@@ -296,12 +314,6 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
}
@Override
- public Committer createCommitter(String name)
- {
- return new CoalescingCommiter(name, this);
- }
-
- @Override
public String getStoreLocation()
{
return _storePath;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 7a999374e8..2d411724a6 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -45,9 +45,10 @@ import java.util.concurrent.atomic.AtomicReference;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.Sequence;
import com.sleepycat.je.SequenceConfig;
+
import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
-import org.apache.qpid.server.store.berkeleydb.Committer;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener;
import org.apache.qpid.server.util.DaemonThreadFactory;
@@ -213,7 +214,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
@Override
- public void commit(final Transaction tx)
+ public StoreFuture commit(final Transaction tx, boolean syncCommit)
{
try
{
@@ -225,6 +226,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
throw handleDatabaseException("Got DatabaseException on commit, closing environment", de);
}
+ return _coalescingCommiter.commit(tx, syncCommit);
}
@Override
@@ -246,6 +248,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
try
{
+ _coalescingCommiter.close();
closeSequences();
closeDatabases();
}
@@ -1036,12 +1039,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return environment;
}
- @Override
- public Committer createCommitter(String name)
- {
- return _coalescingCommiter;
- }
-
NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException
{
if (repNode == null)
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
index 84ac9ca277..7182cb10af 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
@@ -28,8 +28,8 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>
{
- String REMOTE_TRANSACTION_SYNCRONIZATION_POLICY = "remoteTransactionSynchronizationPolicy";
- String LOCAL_TRANSACTION_SYNCRONIZATION_POLICY = "localTransactionSynchronizationPolicy";
+ String REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY = "remoteTransactionSynchronizationPolicy";
+ String LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = "localTransactionSynchronizationPolicy";
String COALESCING_SYNC = "coalescingSync";
String REPLICA_ACKNOWLEDGMENT_POLICY = "replicaAcknowledgmentPolicy";
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
index af82d9fd2d..f66d157246 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
@@ -131,13 +131,13 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm
{
super.validateChange(proxyForValidation, changedAttributes);
- if(changedAttributes.contains(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY))
+ if(changedAttributes.contains(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY))
{
String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getLocalTransactionSynchronizationPolicy();
validateTransactionSynchronizationPolicy(policy);
}
- if(changedAttributes.contains(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY))
+ if(changedAttributes.contains(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY))
{
String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getRemoteTransactionSynchronizationPolicy();
validateTransactionSynchronizationPolicy(policy);
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
index fd196a28da..a2ef422046 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
@@ -472,8 +472,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
assertTrue("CoalescingSync is not ON", virtualHost.isCoalescingSync());
Map<String, Object> virtualHostAttributes = new HashMap<String,Object>();
- virtualHostAttributes.put(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "WRITE_NO_SYNC");
- virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "SYNC");
+ virtualHostAttributes.put(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "WRITE_NO_SYNC");
+ virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC");
virtualHost.setAttributes(virtualHostAttributes);
awaitForAttributeChange(virtualHost, BDBHAVirtualHostImpl.COALESCING_SYNC, false);
@@ -482,7 +482,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
assertFalse("CoalescingSync is not OFF", virtualHost.isCoalescingSync());
try
{
- virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "INVALID"));
+ virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "INVALID"));
fail("Invalid syncronization policy is set");
}
catch(IllegalArgumentException e)
@@ -492,7 +492,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
try
{
- virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "INVALID"));
+ virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "INVALID"));
fail("Invalid syncronization policy is set");
}
catch(IllegalArgumentException e)
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index b14332ecf6..67364ada35 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.qpid.server.store.berkeleydb.Committer;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.test.utils.TestFileUtils;
@@ -169,29 +168,13 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
ReplicatedEnvironmentFacade master = createMaster();
assertEquals("Unexpected message store durability", TEST_DURABILITY, master.getMessageStoreTransactionDurability());
assertEquals("Unexpected durability", TEST_DURABILITY, master.getDurability());
- Committer committer = master.createCommitter(TEST_GROUP_NAME);
- committer.start();
-
- waitForCommitter(committer, true);
-
+ assertFalse("Coalescing syn before policy set to SYNC", master.isCoalescingSync());
+ master.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.SYNC);
+ assertTrue("Coalescing syn after policy set to SYNC", master.isCoalescingSync());
assertEquals("Unexpected message store durability after committer start", "NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", master.getMessageStoreTransactionDurability().toString());
-
- committer.stop();
- waitForCommitter(committer, false);
- assertEquals("Unexpected message store durability after committer stop", TEST_DURABILITY, master.getMessageStoreTransactionDurability());
- }
-
- public void testIsCoalescingSync() throws Exception
- {
- ReplicatedEnvironmentFacade master = createMaster();
- assertEquals("Unexpected coalescing sync", false, master.isCoalescingSync());
- Committer committer = master.createCommitter(TEST_GROUP_NAME);
- committer.start();
- waitForCommitter(committer, true);
- assertEquals("Unexpected coalescing sync", true, master.isCoalescingSync());
- committer.stop();
- waitForCommitter(committer, false);
- assertEquals("Unexpected coalescing sync", false, master.isCoalescingSync());
+ master.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.WRITE_NO_SYNC);
+ assertEquals("Unexpected message store durability after committer stop", "WRITE_NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", master.getMessageStoreTransactionDurability().toString());
+ assertFalse("Coalescing syn after policy set to WRITE_NO_SYNC", master.isCoalescingSync());
}
public void testGetNodeState() throws Exception
@@ -719,17 +702,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener, replicationGroupListener);
}
- private void waitForCommitter(Committer committer, boolean expected) throws InterruptedException
- {
- int counter = 0;
- while(committer.isStarted() != expected && counter < 100)
- {
- Thread.sleep(20);
- counter++;
- }
- assertEquals("Committer is not in expected state", expected, committer.isStarted());
- }
-
private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary)
{
ReplicatedEnvironmentConfiguration node = mock(ReplicatedEnvironmentConfiguration.class);
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
index ca762cd86d..80d940bc1b 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY;
-import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY;
+import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY;
+import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY;
import java.io.File;
import java.io.IOException;
@@ -96,24 +96,24 @@ public class BDBHAVirtualHostRestTest extends QpidRestTestCase
public void testSetLocalTransactionSynchronizationPolicy() throws Exception
{
Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
- assertEquals("Unexpected synchronization policy before change", "SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY));
+ assertEquals("Unexpected synchronization policy before change", "SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY));
- Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "NO_SYNC");
+ Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "NO_SYNC");
getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 200);
hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl);
- assertEquals("Unexpected synchronization policy after change", "NO_SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY));
+ assertEquals("Unexpected synchronization policy after change", "NO_SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY));
}
public void testSetRemoteTransactionSynchronizationPolicy() throws Exception
{
Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
- assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY));
+ assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY));
- Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "SYNC");
+ Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC");
getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 200);
hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl);
- assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY));
+ assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY));
}
}