diff options
Diffstat (limited to 'qpid/java')
11 files changed, 139 insertions, 182 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java index 279762eacd..50a05ef6b8 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java @@ -127,8 +127,6 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi private final EnvironmentFacadeFactory _environmentFacadeFactory; - private volatile Committer _committer; - private String _storeLocation; private final BDBMessageStore _messageStoreFacade = new BDBMessageStore(); private ConfiguredObject<?> _parent; @@ -641,9 +639,6 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi ); _storeLocation = _environmentFacade.getStoreLocation(); } - - _committer = _environmentFacade.createCommitter(parent.getName()); - _committer.start(); } } @@ -699,22 +694,9 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi @Override public void closeMessageStore() { - if (_messageStoreOpen.compareAndSet(true, false)) + if (_messageStoreOpen.compareAndSet(true, false) && !_configurationStoreOpen.get()) { - try - { - if (_committer != null) - { - _committer.close(); - } - } - finally - { - if (!_configurationStoreOpen.get()) - { - closeEnvironment(); - } - } + closeEnvironment(); } } @@ -908,8 +890,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi LOGGER.debug("Deleted content for message " + messageId); } - _environmentFacade.commit(tx); - _committer.commit(tx, sync); + _environmentFacade.commit(tx, sync); complete = true; tx = null; @@ -1334,8 +1315,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi throw new StoreException("Fatal internal error: transactional is null at commitTran"); } - _environmentFacade.commit(tx); - StoreFuture result = _committer.commit(tx, syncCommit); + StoreFuture result = _environmentFacade.commit(tx, syncCommit); if (LOGGER.isDebugEnabled()) { @@ -1617,8 +1597,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi throw _environmentFacade.handleDatabaseException("failed to begin transaction", e); } store(txn); - _environmentFacade.commit(txn); - _committer.commit(txn, true); + _environmentFacade.commit(txn, true); storedSizeChangeOccurred(getMetaData().getContentSize()); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java index c76f4ff2db..90caa85ad5 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java @@ -26,7 +26,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import org.apache.log4j.Logger; import org.apache.qpid.server.store.StoreFuture; @@ -38,12 +37,10 @@ import com.sleepycat.je.Transaction; public class CoalescingCommiter implements Committer { private final CommitTask _commitTask; - private final AtomicReference<Boolean> _started; private final ExecutorService _taskExecutor; public CoalescingCommiter(final String name, EnvironmentFacade environmentFacade) { - _started = new AtomicReference<Boolean>(false); _commitTask = new CommitTask(environmentFacade); _taskExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { @@ -60,7 +57,7 @@ public class CoalescingCommiter implements Committer @Override public void start() { - if (_started.compareAndSet(false, true)) + if (_commitTask.start()) { _taskExecutor.submit(_commitTask); } @@ -69,10 +66,7 @@ public class CoalescingCommiter implements Committer @Override public void stop() { - if (_started.compareAndSet(true, false)) - { - _commitTask.stop(); - } + _commitTask.stop(); } @Override @@ -80,7 +74,6 @@ public class CoalescingCommiter implements Committer { try { - _started.set(false); _commitTask.close(); } finally @@ -92,7 +85,7 @@ public class CoalescingCommiter implements Committer @Override public StoreFuture commit(Transaction tx, boolean syncCommit) { - if (_started.get()) + if (isStarted()) { BDBCommitFuture commitFuture = new BDBCommitFuture(_commitTask, tx, syncCommit); try @@ -111,7 +104,7 @@ public class CoalescingCommiter implements Committer public boolean isStarted() { - return _started.get(); + return !_commitTask.isStopped(); } private static final class BDBCommitFuture implements StoreFuture @@ -187,11 +180,6 @@ public class CoalescingCommiter implements Committer while (!isComplete()) { - if (_commitTask.isStopped()) - { - throw new IllegalStateException("Commit thread is stopped"); - } - _commitTask.explicitNotify(); try { @@ -201,6 +189,24 @@ public class CoalescingCommiter implements Committer { throw new RuntimeException(e); } + + if (!_commitTask.isClosed() && _commitTask.isStopped() && !isComplete()) + { + // coalesing sync is not required anymore + // flush log and mark transaction as completed + try + { + _commitTask.flushLog(); + } + catch(DatabaseException e) + { + _databaseException = e; + } + finally + { + complete(); + } + } } if(LOGGER.isDebugEnabled()) @@ -224,6 +230,7 @@ public class CoalescingCommiter implements Committer private static final Logger LOGGER = Logger.getLogger(CommitTask.class); private final AtomicBoolean _stopped = new AtomicBoolean(true); + private final AtomicBoolean _closed = new AtomicBoolean(false); private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); private final Object _lock = new Object(); private final EnvironmentFacade _environmentFacade; @@ -233,6 +240,11 @@ public class CoalescingCommiter implements Committer _environmentFacade = environmentFacade; } + public boolean isClosed() + { + return _closed.get(); + } + public boolean isStopped() { return _stopped.get(); @@ -249,27 +261,30 @@ public class CoalescingCommiter implements Committer @Override public void run() { - if (_stopped.compareAndSet(true, false)) + while (!_stopped.get()) { - while (!_stopped.get()) + synchronized (_lock) { - synchronized (_lock) + while (!_stopped.get() && !hasJobs()) { - while (!_stopped.get() && !hasJobs()) + try + { + // Periodically wake up and check, just in case we + // missed a notification. Don't want to lock the broker hard. + _lock.wait(1000); + } + catch (InterruptedException e) { - try - { - // Periodically wake up and check, just in case we - // missed a notification. Don't want to lock the broker hard. - _lock.wait(1000); - } - catch (InterruptedException e) - { - } } } - processJobs(); } + processJobs(); + } + + // process remaining jobs if such were added whilst stopped + if (hasJobs()) + { + processJobs(); } } @@ -285,11 +300,7 @@ public class CoalescingCommiter implements Committer startTime = System.currentTimeMillis(); } - Environment environment = _environmentFacade.getEnvironment(); - if (environment != null && environment.isValid()) - { - environment.flushLog(true); - } + flushLog(); if(LOGGER.isDebugEnabled()) { @@ -340,6 +351,15 @@ public class CoalescingCommiter implements Committer } } + private void flushLog() + { + Environment environment = _environmentFacade.getEnvironment(); + if (environment != null && environment.isValid()) + { + environment.flushLog(true); + } + } + private boolean hasJobs() { return !_jobQueue.isEmpty(); @@ -361,35 +381,44 @@ public class CoalescingCommiter implements Committer } } + public boolean start() + { + return _stopped.compareAndSet(true, false); + } + public void stop() { - synchronized (_lock) + if (_stopped.compareAndSet(false, true)) { - if (_stopped.compareAndSet(false, true) || hasJobs()) + synchronized (_lock) { - processJobs(); + _lock.notifyAll(); } + _jobQueue.clear(); } } public void close() { - RuntimeException e = new RuntimeException("Commit thread has been stopped"); - synchronized (_lock) + if (_closed.compareAndSet(false, true)) { - _stopped.set(true); - BDBCommitFuture commit = null; - int abortedCommits = 0; - while ((commit = _jobQueue.poll()) != null) - { - abortedCommits++; - commit.abort(e); - } - if (LOGGER.isDebugEnabled() && abortedCommits > 0) + RuntimeException e = new RuntimeException("Commit thread has been closed"); + synchronized (_lock) { - LOGGER.debug(abortedCommits + " commit(s) were aborted during close."); + _stopped.set(true); + BDBCommitFuture commit = null; + int abortedCommits = 0; + while ((commit = _jobQueue.poll()) != null) + { + abortedCommits++; + commit.abort(e); + } + if (LOGGER.isDebugEnabled() && abortedCommits > 0) + { + LOGGER.debug(abortedCommits + " commit(s) were aborted during close."); + } + _lock.notifyAll(); } - _lock.notifyAll(); } } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java index d1362ed89c..bc9771f463 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java @@ -35,36 +35,4 @@ public interface Committer void close(); boolean isStarted(); - - Committer IMMEDIATE_FUTURE_COMMITTER = new Committer() - { - - @Override - public void start() - { - } - - @Override - public StoreFuture commit(Transaction tx, boolean syncCommit) - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override - public void stop() - { - } - - @Override - public void close() - { - } - - @Override - public boolean isStarted() - { - return true; - } - }; - }
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java index 8e62b4c476..3c8fe00d01 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java @@ -24,6 +24,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.apache.qpid.server.store.StoreFuture; + import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; @@ -50,11 +52,9 @@ public interface EnvironmentFacade Sequence openSequence(Database database, DatabaseEntry sequenceKey, SequenceConfig sequenceConfig); - Committer createCommitter(String name); - Transaction beginTransaction(); - void commit(com.sleepycat.je.Transaction tx); + StoreFuture commit(com.sleepycat.je.Transaction tx, boolean sync); DatabaseException handleDatabaseException(String contextMessage, DatabaseException e); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java index ee7ea79e8e..f2690a5aa1 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java @@ -21,15 +21,16 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; -import java.net.ServerSocket; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.Sequence; import com.sleepycat.je.SequenceConfig; -import com.sun.org.apache.xerces.internal.dom.DeepNodeListImpl; + import org.apache.log4j.Logger; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.store.StoreFuture; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; @@ -48,6 +49,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>(); private Environment _environment; + private final Committer _committer; public StandardEnvironmentFacade(String storePath, Map<String, String> attributes) @@ -69,6 +71,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade } } + String name = (String)attributes.get(ConfiguredObject.NAME); EnvironmentConfig envConfig = new EnvironmentConfig(); envConfig.setAllowCreate(true); envConfig.setTransactional(true); @@ -82,6 +85,9 @@ public class StandardEnvironmentFacade implements EnvironmentFacade envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); _environment = new Environment(environmentPath, envConfig); + + _committer = new CoalescingCommiter(name, this); + _committer.start(); } @@ -92,7 +98,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade } @Override - public void commit(com.sleepycat.je.Transaction tx) + public StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) { try { @@ -106,14 +112,26 @@ public class StandardEnvironmentFacade implements EnvironmentFacade throw handleDatabaseException("Got DatabaseException on commit", de); } + return _committer.commit(tx, syncCommit); } @Override public void close() { - closeSequences(); - closeDatabases(); - closeEnvironment(); + try + { + if (_committer != null) + { + _committer.close(); + } + + closeSequences(); + closeDatabases(); + } + finally + { + closeEnvironment(); + } } private void closeSequences() @@ -296,12 +314,6 @@ public class StandardEnvironmentFacade implements EnvironmentFacade } @Override - public Committer createCommitter(String name) - { - return new CoalescingCommiter(name, this); - } - - @Override public String getStoreLocation() { return _storePath; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 7a999374e8..2d411724a6 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -45,9 +45,10 @@ import java.util.concurrent.atomic.AtomicReference; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.Sequence; import com.sleepycat.je.SequenceConfig; + import org.apache.log4j.Logger; +import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter; -import org.apache.qpid.server.store.berkeleydb.Committer; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener; import org.apache.qpid.server.util.DaemonThreadFactory; @@ -213,7 +214,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } @Override - public void commit(final Transaction tx) + public StoreFuture commit(final Transaction tx, boolean syncCommit) { try { @@ -225,6 +226,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { throw handleDatabaseException("Got DatabaseException on commit, closing environment", de); } + return _coalescingCommiter.commit(tx, syncCommit); } @Override @@ -246,6 +248,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan try { + _coalescingCommiter.close(); closeSequences(); closeDatabases(); } @@ -1036,12 +1039,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return environment; } - @Override - public Committer createCommitter(String name) - { - return _coalescingCommiter; - } - NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException { if (repNode == null) diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java index 84ac9ca277..7182cb10af 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java @@ -28,8 +28,8 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl; public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>> { - String REMOTE_TRANSACTION_SYNCRONIZATION_POLICY = "remoteTransactionSynchronizationPolicy"; - String LOCAL_TRANSACTION_SYNCRONIZATION_POLICY = "localTransactionSynchronizationPolicy"; + String REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY = "remoteTransactionSynchronizationPolicy"; + String LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = "localTransactionSynchronizationPolicy"; String COALESCING_SYNC = "coalescingSync"; String REPLICA_ACKNOWLEDGMENT_POLICY = "replicaAcknowledgmentPolicy"; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java index af82d9fd2d..f66d157246 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java @@ -131,13 +131,13 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm { super.validateChange(proxyForValidation, changedAttributes); - if(changedAttributes.contains(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY)) + if(changedAttributes.contains(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)) { String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getLocalTransactionSynchronizationPolicy(); validateTransactionSynchronizationPolicy(policy); } - if(changedAttributes.contains(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY)) + if(changedAttributes.contains(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY)) { String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getRemoteTransactionSynchronizationPolicy(); validateTransactionSynchronizationPolicy(policy); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java index fd196a28da..a2ef422046 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java @@ -472,8 +472,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase assertTrue("CoalescingSync is not ON", virtualHost.isCoalescingSync()); Map<String, Object> virtualHostAttributes = new HashMap<String,Object>(); - virtualHostAttributes.put(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "WRITE_NO_SYNC"); - virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "SYNC"); + virtualHostAttributes.put(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "WRITE_NO_SYNC"); + virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC"); virtualHost.setAttributes(virtualHostAttributes); awaitForAttributeChange(virtualHost, BDBHAVirtualHostImpl.COALESCING_SYNC, false); @@ -482,7 +482,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase assertFalse("CoalescingSync is not OFF", virtualHost.isCoalescingSync()); try { - virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "INVALID")); + virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "INVALID")); fail("Invalid syncronization policy is set"); } catch(IllegalArgumentException e) @@ -492,7 +492,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase try { - virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "INVALID")); + virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "INVALID")); fail("Invalid syncronization policy is set"); } catch(IllegalArgumentException e) diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index b14332ecf6..67364ada35 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import org.apache.qpid.server.store.berkeleydb.Committer; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.test.utils.TestFileUtils; @@ -169,29 +168,13 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase ReplicatedEnvironmentFacade master = createMaster(); assertEquals("Unexpected message store durability", TEST_DURABILITY, master.getMessageStoreTransactionDurability()); assertEquals("Unexpected durability", TEST_DURABILITY, master.getDurability()); - Committer committer = master.createCommitter(TEST_GROUP_NAME); - committer.start(); - - waitForCommitter(committer, true); - + assertFalse("Coalescing syn before policy set to SYNC", master.isCoalescingSync()); + master.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.SYNC); + assertTrue("Coalescing syn after policy set to SYNC", master.isCoalescingSync()); assertEquals("Unexpected message store durability after committer start", "NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", master.getMessageStoreTransactionDurability().toString()); - - committer.stop(); - waitForCommitter(committer, false); - assertEquals("Unexpected message store durability after committer stop", TEST_DURABILITY, master.getMessageStoreTransactionDurability()); - } - - public void testIsCoalescingSync() throws Exception - { - ReplicatedEnvironmentFacade master = createMaster(); - assertEquals("Unexpected coalescing sync", false, master.isCoalescingSync()); - Committer committer = master.createCommitter(TEST_GROUP_NAME); - committer.start(); - waitForCommitter(committer, true); - assertEquals("Unexpected coalescing sync", true, master.isCoalescingSync()); - committer.stop(); - waitForCommitter(committer, false); - assertEquals("Unexpected coalescing sync", false, master.isCoalescingSync()); + master.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.WRITE_NO_SYNC); + assertEquals("Unexpected message store durability after committer stop", "WRITE_NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", master.getMessageStoreTransactionDurability().toString()); + assertFalse("Coalescing syn after policy set to WRITE_NO_SYNC", master.isCoalescingSync()); } public void testGetNodeState() throws Exception @@ -719,17 +702,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener, replicationGroupListener); } - private void waitForCommitter(Committer committer, boolean expected) throws InterruptedException - { - int counter = 0; - while(committer.isStarted() != expected && counter < 100) - { - Thread.sleep(20); - counter++; - } - assertEquals("Committer is not in expected state", expected, committer.isStarted()); - } - private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary) { ReplicatedEnvironmentConfiguration node = mock(ReplicatedEnvironmentConfiguration.class); diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java index ca762cd86d..80d940bc1b 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.server.store.berkeleydb; -import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY; -import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY; +import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY; +import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY; import java.io.File; import java.io.IOException; @@ -96,24 +96,24 @@ public class BDBHAVirtualHostRestTest extends QpidRestTestCase public void testSetLocalTransactionSynchronizationPolicy() throws Exception { Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name()); - assertEquals("Unexpected synchronization policy before change", "SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY)); + assertEquals("Unexpected synchronization policy before change", "SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)); - Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "NO_SYNC"); + Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "NO_SYNC"); getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 200); hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl); - assertEquals("Unexpected synchronization policy after change", "NO_SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY)); + assertEquals("Unexpected synchronization policy after change", "NO_SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)); } public void testSetRemoteTransactionSynchronizationPolicy() throws Exception { Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name()); - assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY)); + assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY)); - Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "SYNC"); + Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC"); getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 200); hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl); - assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY)); + assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY)); } } |
