diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-02-19 17:32:33 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-02-19 17:32:33 +0000 |
| commit | 67a9864ed236ed085e263beaea7bae2c52522331 (patch) | |
| tree | 7b68eea9dfab1c7f958b68541a00dfa17da3a334 /qpid/java/bdbstore/src | |
| parent | db70f1d2908f294fee0ed47cdb478c3ab0f3b252 (diff) | |
| download | qpid-python-67a9864ed236ed085e263beaea7bae2c52522331.tar.gz | |
Make close return a future, wait on Future in broker shutdown
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1660949 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
7 files changed, 118 insertions, 48 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index e8c337a578..be9248c0d2 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -51,7 +51,7 @@ import org.apache.qpid.server.store.EventManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.Xid; @@ -924,14 +924,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore * * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason. */ - private StoreFuture commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException + private FutureResult commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException { if (tx == null) { throw new StoreException("Fatal internal error: transactional is null at commitTran"); } - StoreFuture result = getEnvironmentFacade().commit(tx, syncCommit); + FutureResult result = getEnvironmentFacade().commit(tx, syncCommit); if (getLogger().isDebugEnabled()) { @@ -1386,7 +1386,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - synchronized StoreFuture flushToStore() + synchronized FutureResult flushToStore() { if(!stored()) { @@ -1407,7 +1407,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore storedSizeChangeOccurred(getMetaData().getContentSize()); } - return StoreFuture.IMMEDIATE_FUTURE; + return FutureResult.IMMEDIATE_FUTURE; } @Override @@ -1526,14 +1526,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public StoreFuture commitTranAsync() throws StoreException + public FutureResult commitTranAsync() throws StoreException { checkMessageStoreOpen(); doPreCommitActions(); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); - StoreFuture storeFuture = AbstractBDBMessageStore.this.commitTranImpl(_txn, false); + FutureResult futureResult = AbstractBDBMessageStore.this.commitTranImpl(_txn, false); doPostCommitActions(); - return storeFuture; + return futureResult; } @Override diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java index 964335869d..2a8cf92b3d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import com.sleepycat.je.DatabaseException; @@ -29,7 +30,7 @@ import com.sleepycat.je.Environment; import com.sleepycat.je.Transaction; import org.apache.log4j.Logger; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; public class CoalescingCommiter implements Committer { @@ -65,16 +66,16 @@ public class CoalescingCommiter implements Committer } @Override - public StoreFuture commit(Transaction tx, boolean syncCommit) + public FutureResult commit(Transaction tx, boolean syncCommit) { - BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); + BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx, syncCommit); commitFuture.commit(); return commitFuture; } - private static final class BDBCommitFuture implements StoreFuture + private static final class BDBCommitFutureResult implements FutureResult { - private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class); + private static final Logger LOGGER = Logger.getLogger(BDBCommitFutureResult.class); private final CommitThread _commitThread; private final Transaction _tx; @@ -82,7 +83,7 @@ public class CoalescingCommiter implements Committer private RuntimeException _databaseException; private boolean _complete; - public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit) + public BDBCommitFutureResult(CommitThread commitThread, Transaction tx, boolean syncCommit) { _commitThread = commitThread; _tx = tx; @@ -162,13 +163,47 @@ public class CoalescingCommiter implements Committer LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); } } + + public synchronized void waitForCompletion(long timeout) throws TimeoutException + { + long startTime= System.currentTimeMillis(); + long remaining = timeout; + + while (!isComplete() && remaining > 0) + { + _commitThread.explicitNotify(); + try + { + wait(remaining); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + if(!isComplete()) + { + remaining = (startTime + timeout) - System.currentTimeMillis(); + } + } + + if(remaining < 0l) + { + throw new TimeoutException("commit did not occur within given timeout period: " + timeout); + } + + if(LOGGER.isDebugEnabled()) + { + long duration = System.currentTimeMillis() - startTime; + LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); + } + } } /** - * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations + * Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult} operations. The commit operations * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before * continuing, but it is the responsibility of this thread to tell the commit operations when they have been - * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. + * completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#abort} methods. * * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table> */ @@ -177,7 +212,7 @@ public class CoalescingCommiter implements Committer private static final Logger LOGGER = Logger.getLogger(CommitThread.class); private final AtomicBoolean _stopped = new AtomicBoolean(false); - private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); + private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>(); private final Object _lock = new Object(); private final EnvironmentFacade _environmentFacade; @@ -244,7 +279,7 @@ public class CoalescingCommiter implements Committer for(int i = 0; i < size; i++) { - BDBCommitFuture commit = _jobQueue.poll(); + BDBCommitFutureResult commit = _jobQueue.poll(); if (commit == null) { break; @@ -261,7 +296,7 @@ public class CoalescingCommiter implements Committer for(int i = 0; i < size; i++) { - BDBCommitFuture commit = _jobQueue.poll(); + BDBCommitFutureResult commit = _jobQueue.poll(); if (commit == null) { break; @@ -290,7 +325,7 @@ public class CoalescingCommiter implements Committer return !_jobQueue.isEmpty(); } - public void addJob(BDBCommitFuture commit, final boolean sync) + public void addJob(BDBCommitFutureResult commit, final boolean sync) { if (_stopped.get()) { @@ -313,7 +348,7 @@ public class CoalescingCommiter implements Committer { _stopped.set(true); Environment environment = _environmentFacade.getEnvironment(); - BDBCommitFuture commit; + BDBCommitFutureResult commit; if (environment != null && environment.isValid()) { environment.flushLog(true); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java index 1f05dca41a..133a0ee7d9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java @@ -22,16 +22,17 @@ package org.apache.qpid.server.store.berkeleydb; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.log4j.Logger; -import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; - import com.sleepycat.je.CheckpointConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.Transaction; +import org.apache.log4j.Logger; + +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.util.FutureResult; public class CommitThreadWrapper { @@ -53,16 +54,16 @@ public class CommitThreadWrapper _commitThread.join(); } - public StoreFuture commit(Transaction tx, boolean syncCommit) + public FutureResult commit(Transaction tx, boolean syncCommit) { - BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); + BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx, syncCommit); commitFuture.commit(); return commitFuture; } - private static final class BDBCommitFuture implements StoreFuture + private static final class BDBCommitFutureResult implements FutureResult { - private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class); + private static final Logger LOGGER = Logger.getLogger(BDBCommitFutureResult.class); private final CommitThread _commitThread; private final Transaction _tx; @@ -70,7 +71,7 @@ public class CommitThreadWrapper private boolean _complete; private boolean _syncCommit; - public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit) + public BDBCommitFutureResult(CommitThread commitThread, Transaction tx, boolean syncCommit) { _commitThread = commitThread; _tx = tx; @@ -150,13 +151,48 @@ public class CommitThreadWrapper LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); } } + + @Override + public void waitForCompletion(final long timeout) throws TimeoutException + { + long startTime = System.currentTimeMillis(); + long remaining = timeout; + + while (!isComplete() && remaining > 0) + { + _commitThread.explicitNotify(); + try + { + wait(remaining); + } + catch (InterruptedException e) + { + throw new StoreException(e); + } + if(!isComplete()) + { + remaining = (startTime + timeout) - System.currentTimeMillis(); + } + } + + if(remaining < 0) + { + throw new TimeoutException("Commit did not complete within required timeout: " + timeout); + } + + if(LOGGER.isDebugEnabled()) + { + long duration = System.currentTimeMillis() - startTime; + LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); + } + } } /** - * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations + * Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult} operations. The commit operations * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before * continuing, but it is the responsibility of this thread to tell the commit operations when they have been - * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. + * completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#abort} methods. * * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table> */ @@ -165,7 +201,7 @@ public class CommitThreadWrapper private static final Logger LOGGER = Logger.getLogger(CommitThread.class); private final AtomicBoolean _stopped = new AtomicBoolean(false); - private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); + private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>(); private final CheckpointConfig _config = new CheckpointConfig(); private final Object _lock = new Object(); private Environment _environment; @@ -230,7 +266,7 @@ public class CommitThreadWrapper for(int i = 0; i < size; i++) { - BDBCommitFuture commit = _jobQueue.poll(); + BDBCommitFutureResult commit = _jobQueue.poll(); commit.complete(); } @@ -243,7 +279,7 @@ public class CommitThreadWrapper for(int i = 0; i < size; i++) { - BDBCommitFuture commit = _jobQueue.poll(); + BDBCommitFutureResult commit = _jobQueue.poll(); commit.abort(e); } } @@ -268,7 +304,7 @@ public class CommitThreadWrapper return !_jobQueue.isEmpty(); } - public void addJob(BDBCommitFuture commit, final boolean sync) + public void addJob(BDBCommitFutureResult commit, final boolean sync) { _jobQueue.add(commit); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java index 01e45d8ac5..9bd1aaf3e0 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java @@ -20,15 +20,15 @@ */ package org.apache.qpid.server.store.berkeleydb; -import org.apache.qpid.server.store.StoreFuture; - import com.sleepycat.je.Transaction; +import org.apache.qpid.server.util.FutureResult; + public interface Committer { void start(); - StoreFuture commit(Transaction tx, boolean syncCommit); + FutureResult commit(Transaction tx, boolean syncCommit); void stop(); -}
\ No newline at end of file +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java index a42bc43a5e..e3969c467c 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java @@ -27,14 +27,13 @@ import java.util.Map; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.Sequence; import com.sleepycat.je.SequenceConfig; import com.sleepycat.je.Transaction; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; public interface EnvironmentFacade { @@ -55,7 +54,7 @@ public interface EnvironmentFacade Transaction beginTransaction(); - StoreFuture commit(com.sleepycat.je.Transaction tx, boolean sync); + FutureResult commit(com.sleepycat.je.Transaction tx, boolean sync); RuntimeException handleDatabaseException(String contextMessage, RuntimeException e); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java index f3a06db89c..eff652ce05 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java @@ -38,7 +38,7 @@ import com.sleepycat.je.Transaction; import org.apache.log4j.Logger; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.berkeleydb.logging.Log4jLoggingHandler; public class StandardEnvironmentFacade implements EnvironmentFacade @@ -127,7 +127,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade } @Override - public StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) + public FutureResult commit(com.sleepycat.je.Transaction tx, boolean syncCommit) { try { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index c151a594bf..d6dff430ad 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -73,7 +73,7 @@ import org.codehaus.jackson.map.ObjectMapper; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.berkeleydb.BDBUtils; import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter; import org.apache.qpid.server.store.berkeleydb.EnvHomeRegistry; @@ -265,7 +265,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } @Override - public StoreFuture commit(final Transaction tx, boolean syncCommit) + public FutureResult commit(final Transaction tx, boolean syncCommit) { try { @@ -283,7 +283,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { return _coalescingCommiter.commit(tx, syncCommit); } - return StoreFuture.IMMEDIATE_FUTURE; + return FutureResult.IMMEDIATE_FUTURE; } @Override |
