summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-19 17:32:33 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-19 17:32:33 +0000
commit67a9864ed236ed085e263beaea7bae2c52522331 (patch)
tree7b68eea9dfab1c7f958b68541a00dfa17da3a334 /qpid/java/bdbstore/src
parentdb70f1d2908f294fee0ed47cdb478c3ab0f3b252 (diff)
downloadqpid-python-67a9864ed236ed085e263beaea7bae2c52522331.tar.gz
Make close return a future, wait on Future in broker shutdown
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1660949 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java16
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java61
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java66
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java8
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java5
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java4
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java6
7 files changed, 118 insertions, 48 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index e8c337a578..be9248c0d2 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -51,7 +51,7 @@ import org.apache.qpid.server.store.EventManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.Xid;
@@ -924,14 +924,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore
*
* @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason.
*/
- private StoreFuture commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException
+ private FutureResult commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException
{
if (tx == null)
{
throw new StoreException("Fatal internal error: transactional is null at commitTran");
}
- StoreFuture result = getEnvironmentFacade().commit(tx, syncCommit);
+ FutureResult result = getEnvironmentFacade().commit(tx, syncCommit);
if (getLogger().isDebugEnabled())
{
@@ -1386,7 +1386,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- synchronized StoreFuture flushToStore()
+ synchronized FutureResult flushToStore()
{
if(!stored())
{
@@ -1407,7 +1407,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
storedSizeChangeOccurred(getMetaData().getContentSize());
}
- return StoreFuture.IMMEDIATE_FUTURE;
+ return FutureResult.IMMEDIATE_FUTURE;
}
@Override
@@ -1526,14 +1526,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
- public StoreFuture commitTranAsync() throws StoreException
+ public FutureResult commitTranAsync() throws StoreException
{
checkMessageStoreOpen();
doPreCommitActions();
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
- StoreFuture storeFuture = AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
+ FutureResult futureResult = AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
doPostCommitActions();
- return storeFuture;
+ return futureResult;
}
@Override
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
index 964335869d..2a8cf92b3d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.sleepycat.je.DatabaseException;
@@ -29,7 +30,7 @@ import com.sleepycat.je.Environment;
import com.sleepycat.je.Transaction;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
public class CoalescingCommiter implements Committer
{
@@ -65,16 +66,16 @@ public class CoalescingCommiter implements Committer
}
@Override
- public StoreFuture commit(Transaction tx, boolean syncCommit)
+ public FutureResult commit(Transaction tx, boolean syncCommit)
{
- BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+ BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx, syncCommit);
commitFuture.commit();
return commitFuture;
}
- private static final class BDBCommitFuture implements StoreFuture
+ private static final class BDBCommitFutureResult implements FutureResult
{
- private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class);
+ private static final Logger LOGGER = Logger.getLogger(BDBCommitFutureResult.class);
private final CommitThread _commitThread;
private final Transaction _tx;
@@ -82,7 +83,7 @@ public class CoalescingCommiter implements Committer
private RuntimeException _databaseException;
private boolean _complete;
- public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
+ public BDBCommitFutureResult(CommitThread commitThread, Transaction tx, boolean syncCommit)
{
_commitThread = commitThread;
_tx = tx;
@@ -162,13 +163,47 @@ public class CoalescingCommiter implements Committer
LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
}
}
+
+ public synchronized void waitForCompletion(long timeout) throws TimeoutException
+ {
+ long startTime= System.currentTimeMillis();
+ long remaining = timeout;
+
+ while (!isComplete() && remaining > 0)
+ {
+ _commitThread.explicitNotify();
+ try
+ {
+ wait(remaining);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ if(!isComplete())
+ {
+ remaining = (startTime + timeout) - System.currentTimeMillis();
+ }
+ }
+
+ if(remaining < 0l)
+ {
+ throw new TimeoutException("commit did not occur within given timeout period: " + timeout);
+ }
+
+ if(LOGGER.isDebugEnabled())
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
+ }
+ }
}
/**
- * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
+ * Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult} operations. The commit operations
* themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
* continuing, but it is the responsibility of this thread to tell the commit operations when they have been
- * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
+ * completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#abort} methods.
*
* <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
*/
@@ -177,7 +212,7 @@ public class CoalescingCommiter implements Committer
private static final Logger LOGGER = Logger.getLogger(CommitThread.class);
private final AtomicBoolean _stopped = new AtomicBoolean(false);
- private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
+ private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>();
private final Object _lock = new Object();
private final EnvironmentFacade _environmentFacade;
@@ -244,7 +279,7 @@ public class CoalescingCommiter implements Committer
for(int i = 0; i < size; i++)
{
- BDBCommitFuture commit = _jobQueue.poll();
+ BDBCommitFutureResult commit = _jobQueue.poll();
if (commit == null)
{
break;
@@ -261,7 +296,7 @@ public class CoalescingCommiter implements Committer
for(int i = 0; i < size; i++)
{
- BDBCommitFuture commit = _jobQueue.poll();
+ BDBCommitFutureResult commit = _jobQueue.poll();
if (commit == null)
{
break;
@@ -290,7 +325,7 @@ public class CoalescingCommiter implements Committer
return !_jobQueue.isEmpty();
}
- public void addJob(BDBCommitFuture commit, final boolean sync)
+ public void addJob(BDBCommitFutureResult commit, final boolean sync)
{
if (_stopped.get())
{
@@ -313,7 +348,7 @@ public class CoalescingCommiter implements Committer
{
_stopped.set(true);
Environment environment = _environmentFacade.getEnvironment();
- BDBCommitFuture commit;
+ BDBCommitFutureResult commit;
if (environment != null && environment.isValid())
{
environment.flushLog(true);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
index 1f05dca41a..133a0ee7d9 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
@@ -22,16 +22,17 @@ package org.apache.qpid.server.store.berkeleydb;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
-
import com.sleepycat.je.CheckpointConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.Transaction;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.util.FutureResult;
public class CommitThreadWrapper
{
@@ -53,16 +54,16 @@ public class CommitThreadWrapper
_commitThread.join();
}
- public StoreFuture commit(Transaction tx, boolean syncCommit)
+ public FutureResult commit(Transaction tx, boolean syncCommit)
{
- BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+ BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx, syncCommit);
commitFuture.commit();
return commitFuture;
}
- private static final class BDBCommitFuture implements StoreFuture
+ private static final class BDBCommitFutureResult implements FutureResult
{
- private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class);
+ private static final Logger LOGGER = Logger.getLogger(BDBCommitFutureResult.class);
private final CommitThread _commitThread;
private final Transaction _tx;
@@ -70,7 +71,7 @@ public class CommitThreadWrapper
private boolean _complete;
private boolean _syncCommit;
- public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
+ public BDBCommitFutureResult(CommitThread commitThread, Transaction tx, boolean syncCommit)
{
_commitThread = commitThread;
_tx = tx;
@@ -150,13 +151,48 @@ public class CommitThreadWrapper
LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
}
}
+
+ @Override
+ public void waitForCompletion(final long timeout) throws TimeoutException
+ {
+ long startTime = System.currentTimeMillis();
+ long remaining = timeout;
+
+ while (!isComplete() && remaining > 0)
+ {
+ _commitThread.explicitNotify();
+ try
+ {
+ wait(remaining);
+ }
+ catch (InterruptedException e)
+ {
+ throw new StoreException(e);
+ }
+ if(!isComplete())
+ {
+ remaining = (startTime + timeout) - System.currentTimeMillis();
+ }
+ }
+
+ if(remaining < 0)
+ {
+ throw new TimeoutException("Commit did not complete within required timeout: " + timeout);
+ }
+
+ if(LOGGER.isDebugEnabled())
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
+ }
+ }
}
/**
- * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
+ * Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult} operations. The commit operations
* themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
* continuing, but it is the responsibility of this thread to tell the commit operations when they have been
- * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
+ * completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#abort} methods.
*
* <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
*/
@@ -165,7 +201,7 @@ public class CommitThreadWrapper
private static final Logger LOGGER = Logger.getLogger(CommitThread.class);
private final AtomicBoolean _stopped = new AtomicBoolean(false);
- private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
+ private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>();
private final CheckpointConfig _config = new CheckpointConfig();
private final Object _lock = new Object();
private Environment _environment;
@@ -230,7 +266,7 @@ public class CommitThreadWrapper
for(int i = 0; i < size; i++)
{
- BDBCommitFuture commit = _jobQueue.poll();
+ BDBCommitFutureResult commit = _jobQueue.poll();
commit.complete();
}
@@ -243,7 +279,7 @@ public class CommitThreadWrapper
for(int i = 0; i < size; i++)
{
- BDBCommitFuture commit = _jobQueue.poll();
+ BDBCommitFutureResult commit = _jobQueue.poll();
commit.abort(e);
}
}
@@ -268,7 +304,7 @@ public class CommitThreadWrapper
return !_jobQueue.isEmpty();
}
- public void addJob(BDBCommitFuture commit, final boolean sync)
+ public void addJob(BDBCommitFutureResult commit, final boolean sync)
{
_jobQueue.add(commit);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
index 01e45d8ac5..9bd1aaf3e0 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
@@ -20,15 +20,15 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import org.apache.qpid.server.store.StoreFuture;
-
import com.sleepycat.je.Transaction;
+import org.apache.qpid.server.util.FutureResult;
+
public interface Committer
{
void start();
- StoreFuture commit(Transaction tx, boolean syncCommit);
+ FutureResult commit(Transaction tx, boolean syncCommit);
void stop();
-} \ No newline at end of file
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
index a42bc43a5e..e3969c467c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
@@ -27,14 +27,13 @@ import java.util.Map;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.Sequence;
import com.sleepycat.je.SequenceConfig;
import com.sleepycat.je.Transaction;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
public interface EnvironmentFacade
{
@@ -55,7 +54,7 @@ public interface EnvironmentFacade
Transaction beginTransaction();
- StoreFuture commit(com.sleepycat.je.Transaction tx, boolean sync);
+ FutureResult commit(com.sleepycat.je.Transaction tx, boolean sync);
RuntimeException handleDatabaseException(String contextMessage, RuntimeException e);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index f3a06db89c..eff652ce05 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -38,7 +38,7 @@ import com.sleepycat.je.Transaction;
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.berkeleydb.logging.Log4jLoggingHandler;
public class StandardEnvironmentFacade implements EnvironmentFacade
@@ -127,7 +127,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
}
@Override
- public StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit)
+ public FutureResult commit(com.sleepycat.je.Transaction tx, boolean syncCommit)
{
try
{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index c151a594bf..d6dff430ad 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -73,7 +73,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.berkeleydb.BDBUtils;
import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
import org.apache.qpid.server.store.berkeleydb.EnvHomeRegistry;
@@ -265,7 +265,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
@Override
- public StoreFuture commit(final Transaction tx, boolean syncCommit)
+ public FutureResult commit(final Transaction tx, boolean syncCommit)
{
try
{
@@ -283,7 +283,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
return _coalescingCommiter.commit(tx, syncCommit);
}
- return StoreFuture.IMMEDIATE_FUTURE;
+ return FutureResult.IMMEDIATE_FUTURE;
}
@Override