diff options
Diffstat (limited to 'qpid/java')
24 files changed, 342 insertions, 109 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index e8c337a578..be9248c0d2 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -51,7 +51,7 @@ import org.apache.qpid.server.store.EventManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.Xid; @@ -924,14 +924,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore * * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason. */ - private StoreFuture commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException + private FutureResult commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException { if (tx == null) { throw new StoreException("Fatal internal error: transactional is null at commitTran"); } - StoreFuture result = getEnvironmentFacade().commit(tx, syncCommit); + FutureResult result = getEnvironmentFacade().commit(tx, syncCommit); if (getLogger().isDebugEnabled()) { @@ -1386,7 +1386,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - synchronized StoreFuture flushToStore() + synchronized FutureResult flushToStore() { if(!stored()) { @@ -1407,7 +1407,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore storedSizeChangeOccurred(getMetaData().getContentSize()); } - return StoreFuture.IMMEDIATE_FUTURE; + return FutureResult.IMMEDIATE_FUTURE; } @Override @@ -1526,14 +1526,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public StoreFuture commitTranAsync() throws StoreException + public FutureResult commitTranAsync() throws StoreException { checkMessageStoreOpen(); doPreCommitActions(); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); - StoreFuture storeFuture = AbstractBDBMessageStore.this.commitTranImpl(_txn, false); + FutureResult futureResult = AbstractBDBMessageStore.this.commitTranImpl(_txn, false); doPostCommitActions(); - return storeFuture; + return futureResult; } @Override diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java index 964335869d..2a8cf92b3d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import com.sleepycat.je.DatabaseException; @@ -29,7 +30,7 @@ import com.sleepycat.je.Environment; import com.sleepycat.je.Transaction; import org.apache.log4j.Logger; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; public class CoalescingCommiter implements Committer { @@ -65,16 +66,16 @@ public class CoalescingCommiter implements Committer } @Override - public StoreFuture commit(Transaction tx, boolean syncCommit) + public FutureResult commit(Transaction tx, boolean syncCommit) { - BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); + BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx, syncCommit); commitFuture.commit(); return commitFuture; } - private static final class BDBCommitFuture implements StoreFuture + private static final class BDBCommitFutureResult implements FutureResult { - private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class); + private static final Logger LOGGER = Logger.getLogger(BDBCommitFutureResult.class); private final CommitThread _commitThread; private final Transaction _tx; @@ -82,7 +83,7 @@ public class CoalescingCommiter implements Committer private RuntimeException _databaseException; private boolean _complete; - public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit) + public BDBCommitFutureResult(CommitThread commitThread, Transaction tx, boolean syncCommit) { _commitThread = commitThread; _tx = tx; @@ -162,13 +163,47 @@ public class CoalescingCommiter implements Committer LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); } } + + public synchronized void waitForCompletion(long timeout) throws TimeoutException + { + long startTime= System.currentTimeMillis(); + long remaining = timeout; + + while (!isComplete() && remaining > 0) + { + _commitThread.explicitNotify(); + try + { + wait(remaining); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + if(!isComplete()) + { + remaining = (startTime + timeout) - System.currentTimeMillis(); + } + } + + if(remaining < 0l) + { + throw new TimeoutException("commit did not occur within given timeout period: " + timeout); + } + + if(LOGGER.isDebugEnabled()) + { + long duration = System.currentTimeMillis() - startTime; + LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); + } + } } /** - * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations + * Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult} operations. The commit operations * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before * continuing, but it is the responsibility of this thread to tell the commit operations when they have been - * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. + * completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#abort} methods. * * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table> */ @@ -177,7 +212,7 @@ public class CoalescingCommiter implements Committer private static final Logger LOGGER = Logger.getLogger(CommitThread.class); private final AtomicBoolean _stopped = new AtomicBoolean(false); - private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); + private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>(); private final Object _lock = new Object(); private final EnvironmentFacade _environmentFacade; @@ -244,7 +279,7 @@ public class CoalescingCommiter implements Committer for(int i = 0; i < size; i++) { - BDBCommitFuture commit = _jobQueue.poll(); + BDBCommitFutureResult commit = _jobQueue.poll(); if (commit == null) { break; @@ -261,7 +296,7 @@ public class CoalescingCommiter implements Committer for(int i = 0; i < size; i++) { - BDBCommitFuture commit = _jobQueue.poll(); + BDBCommitFutureResult commit = _jobQueue.poll(); if (commit == null) { break; @@ -290,7 +325,7 @@ public class CoalescingCommiter implements Committer return !_jobQueue.isEmpty(); } - public void addJob(BDBCommitFuture commit, final boolean sync) + public void addJob(BDBCommitFutureResult commit, final boolean sync) { if (_stopped.get()) { @@ -313,7 +348,7 @@ public class CoalescingCommiter implements Committer { _stopped.set(true); Environment environment = _environmentFacade.getEnvironment(); - BDBCommitFuture commit; + BDBCommitFutureResult commit; if (environment != null && environment.isValid()) { environment.flushLog(true); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java index 1f05dca41a..133a0ee7d9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java @@ -22,16 +22,17 @@ package org.apache.qpid.server.store.berkeleydb; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.log4j.Logger; -import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; - import com.sleepycat.je.CheckpointConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.Transaction; +import org.apache.log4j.Logger; + +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.util.FutureResult; public class CommitThreadWrapper { @@ -53,16 +54,16 @@ public class CommitThreadWrapper _commitThread.join(); } - public StoreFuture commit(Transaction tx, boolean syncCommit) + public FutureResult commit(Transaction tx, boolean syncCommit) { - BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); + BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx, syncCommit); commitFuture.commit(); return commitFuture; } - private static final class BDBCommitFuture implements StoreFuture + private static final class BDBCommitFutureResult implements FutureResult { - private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class); + private static final Logger LOGGER = Logger.getLogger(BDBCommitFutureResult.class); private final CommitThread _commitThread; private final Transaction _tx; @@ -70,7 +71,7 @@ public class CommitThreadWrapper private boolean _complete; private boolean _syncCommit; - public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit) + public BDBCommitFutureResult(CommitThread commitThread, Transaction tx, boolean syncCommit) { _commitThread = commitThread; _tx = tx; @@ -150,13 +151,48 @@ public class CommitThreadWrapper LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); } } + + @Override + public void waitForCompletion(final long timeout) throws TimeoutException + { + long startTime = System.currentTimeMillis(); + long remaining = timeout; + + while (!isComplete() && remaining > 0) + { + _commitThread.explicitNotify(); + try + { + wait(remaining); + } + catch (InterruptedException e) + { + throw new StoreException(e); + } + if(!isComplete()) + { + remaining = (startTime + timeout) - System.currentTimeMillis(); + } + } + + if(remaining < 0) + { + throw new TimeoutException("Commit did not complete within required timeout: " + timeout); + } + + if(LOGGER.isDebugEnabled()) + { + long duration = System.currentTimeMillis() - startTime; + LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); + } + } } /** - * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations + * Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult} operations. The commit operations * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before * continuing, but it is the responsibility of this thread to tell the commit operations when they have been - * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. + * completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#abort} methods. * * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table> */ @@ -165,7 +201,7 @@ public class CommitThreadWrapper private static final Logger LOGGER = Logger.getLogger(CommitThread.class); private final AtomicBoolean _stopped = new AtomicBoolean(false); - private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); + private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>(); private final CheckpointConfig _config = new CheckpointConfig(); private final Object _lock = new Object(); private Environment _environment; @@ -230,7 +266,7 @@ public class CommitThreadWrapper for(int i = 0; i < size; i++) { - BDBCommitFuture commit = _jobQueue.poll(); + BDBCommitFutureResult commit = _jobQueue.poll(); commit.complete(); } @@ -243,7 +279,7 @@ public class CommitThreadWrapper for(int i = 0; i < size; i++) { - BDBCommitFuture commit = _jobQueue.poll(); + BDBCommitFutureResult commit = _jobQueue.poll(); commit.abort(e); } } @@ -268,7 +304,7 @@ public class CommitThreadWrapper return !_jobQueue.isEmpty(); } - public void addJob(BDBCommitFuture commit, final boolean sync) + public void addJob(BDBCommitFutureResult commit, final boolean sync) { _jobQueue.add(commit); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java index 01e45d8ac5..9bd1aaf3e0 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java @@ -20,15 +20,15 @@ */ package org.apache.qpid.server.store.berkeleydb; -import org.apache.qpid.server.store.StoreFuture; - import com.sleepycat.je.Transaction; +import org.apache.qpid.server.util.FutureResult; + public interface Committer { void start(); - StoreFuture commit(Transaction tx, boolean syncCommit); + FutureResult commit(Transaction tx, boolean syncCommit); void stop(); -}
\ No newline at end of file +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java index a42bc43a5e..e3969c467c 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java @@ -27,14 +27,13 @@ import java.util.Map; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.Sequence; import com.sleepycat.je.SequenceConfig; import com.sleepycat.je.Transaction; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; public interface EnvironmentFacade { @@ -55,7 +54,7 @@ public interface EnvironmentFacade Transaction beginTransaction(); - StoreFuture commit(com.sleepycat.je.Transaction tx, boolean sync); + FutureResult commit(com.sleepycat.je.Transaction tx, boolean sync); RuntimeException handleDatabaseException(String contextMessage, RuntimeException e); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java index f3a06db89c..eff652ce05 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java @@ -38,7 +38,7 @@ import com.sleepycat.je.Transaction; import org.apache.log4j.Logger; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.berkeleydb.logging.Log4jLoggingHandler; public class StandardEnvironmentFacade implements EnvironmentFacade @@ -127,7 +127,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade } @Override - public StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) + public FutureResult commit(com.sleepycat.je.Transaction tx, boolean syncCommit) { try { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index c151a594bf..d6dff430ad 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -73,7 +73,7 @@ import org.codehaus.jackson.map.ObjectMapper; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.berkeleydb.BDBUtils; import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter; import org.apache.qpid.server.store.berkeleydb.EnvHomeRegistry; @@ -265,7 +265,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } @Override - public StoreFuture commit(final Transaction tx, boolean syncCommit) + public FutureResult commit(final Transaction tx, boolean syncCommit) { try { @@ -283,7 +283,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { return _coalescingCommiter.commit(tx, syncCommit); } - return StoreFuture.IMMEDIATE_FUTURE; + return FutureResult.IMMEDIATE_FUTURE; } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java index c18923ffe0..6c50fe7cfd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java @@ -26,6 +26,7 @@ import java.io.InputStream; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import java.util.Properties; +import java.util.concurrent.TimeoutException; import javax.security.auth.Subject; @@ -49,6 +50,7 @@ import org.apache.qpid.server.plugin.PluggableFactoryLoader; import org.apache.qpid.server.plugin.SystemConfigFactory; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.FutureResult; public class Broker implements BrokerShutdownProvider { @@ -102,11 +104,16 @@ public class Broker implements BrokerShutdownProvider { if(_systemConfig != null) { - _systemConfig.close(); + final FutureResult closeResult = _systemConfig.close(); + closeResult.waitForCompletion(5000l); } _taskExecutor.stop(); } + catch (TimeoutException e) + { + LOGGER.warn("Attempting to cleanly shutdown took too long, exiting immediately"); + } finally { if (_configuringOwnLogging) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java index fd6f3385c6..52fcf07e25 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.util.FutureResult; public interface ConsumerImpl { @@ -65,7 +66,7 @@ public interface ConsumerImpl boolean seesRequeues(); - void close(); + FutureResult close(); boolean trySendLock(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index baf465f6d1..aef769dc4f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -43,6 +43,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; @@ -68,6 +69,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.security.encryption.ConfigurationSecretEncrypter; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.util.Strings; @@ -457,14 +459,15 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } - protected void closeChildren() + protected FutureResult closeChildren() { + final List<FutureResult> childCloseFutures = new ArrayList<>(); applyToChildren(new Action<ConfiguredObject<?>>() { @Override public void performAction(final ConfiguredObject<?> child) { - child.close(); + childCloseFutures.add(child.close()); } }); @@ -483,13 +486,67 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im childNameMap.clear(); } + + FutureResult futureResult; + if(childCloseFutures.isEmpty()) + { + futureResult = FutureResult.IMMEDIATE_FUTURE; + } + else + { + futureResult = new FutureResult() + { + @Override + public boolean isComplete() + { + for(FutureResult childResult : childCloseFutures) + { + if(!childResult.isComplete()) + { + return false; + } + } + return true; + } + + @Override + public void waitForCompletion() + { + for(FutureResult childResult : childCloseFutures) + { + childResult.waitForCompletion(); + } + } + + + @Override + public void waitForCompletion(long timeout) throws TimeoutException + { + long startTime = System.currentTimeMillis(); + long remaining = timeout; + for(FutureResult childResult : childCloseFutures) + { + + childResult.waitForCompletion(remaining); + remaining = startTime + timeout - System.currentTimeMillis(); + if(remaining < 0) + { + throw new TimeoutException("Completion did not occur within specified timeout: " + timeout); + } + } + } + }; + } + return futureResult; } @Override - public final void close() + public final FutureResult close() { if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED)) { + final CloseResult closeResult = new CloseResult(); + CloseFuture close = beforeClose(); Runnable closeRunnable = new Runnable() @@ -497,7 +554,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im @Override public void run() { - closeChildren(); + final FutureResult result = closeChildren(); + closeResult.setChildFutureResult(result); onClose(); unregister(false); @@ -514,7 +572,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } // if future not complete, schedule the remainder to be done once complete. - + return closeResult; + } + else + { + return FutureResult.IMMEDIATE_FUTURE; } } @@ -1899,6 +1961,72 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } + private static class CloseResult implements FutureResult + { + private volatile FutureResult _childFutureResult; + + @Override + public boolean isComplete() + { + return _childFutureResult != null && _childFutureResult.isComplete(); + } + + @Override + public void waitForCompletion() + { + synchronized (this) + { + while (_childFutureResult == null) + { + try + { + wait(); + } + catch (InterruptedException e) + { + + } + } + } + _childFutureResult.waitForCompletion(); + } + + @Override + public void waitForCompletion(final long timeout) throws TimeoutException + { + long startTime = System.currentTimeMillis(); + long remaining = timeout; + + synchronized (this) + { + while (_childFutureResult == null && remaining > 0) + { + try + { + wait(remaining); + } + catch (InterruptedException e) + { + + } + remaining = startTime + timeout - System.currentTimeMillis(); + + if(remaining < 0) + { + throw new TimeoutException("Completion did not occur within given tiemout: " + timeout); + } + } + } + _childFutureResult.waitForCompletion(remaining); + } + + public synchronized void setChildFutureResult(final FutureResult childFutureResult) + { + _childFutureResult = childFutureResult; + notifyAll(); + } + } + private class AttributeGettingHandler implements InvocationHandler { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java index 7079461a09..89fda6798b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java @@ -28,6 +28,7 @@ import java.util.UUID; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.util.FutureResult; @ManagedObject( creatable = false, category = false ) /** @@ -246,7 +247,7 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>> void open(); - void close(); + FutureResult close(); TaskExecutor getTaskExecutor(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 4dfaa716cf..5868ae61c5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -45,6 +45,7 @@ import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.store.handler.DistributedTransactionHandler; import org.apache.qpid.server.store.handler.MessageHandler; import org.apache.qpid.server.store.handler.MessageInstanceHandler; +import org.apache.qpid.server.util.FutureResult; public abstract class AbstractJDBCMessageStore implements MessageStore { @@ -834,10 +835,10 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } } - private StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws StoreException + private FutureResult commitTranAsync(ConnectionWrapper connWrapper) throws StoreException { commitTran(connWrapper); - return StoreFuture.IMMEDIATE_FUTURE; + return FutureResult.IMMEDIATE_FUTURE; } private void abortTran(ConnectionWrapper connWrapper) throws StoreException @@ -1231,14 +1232,14 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } @Override - public StoreFuture commitTranAsync() + public FutureResult commitTranAsync() { checkMessageStoreOpen(); doPreCommitActions(); - StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper); + FutureResult futureResult = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper); storedSizeChange(_storeSizeIncrease); doPostCommitActions(); - return storeFuture; + return futureResult; } private void doPreCommitActions() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index efe040fbb3..eb887b4ef5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.handler.DistributedTransactionHandler; import org.apache.qpid.server.store.handler.MessageHandler; import org.apache.qpid.server.store.handler.MessageInstanceHandler; +import org.apache.qpid.server.util.FutureResult; /** A simple message store that stores the messages in a thread-safe structure in memory. */ public class MemoryMessageStore implements MessageStore @@ -58,9 +59,9 @@ public class MemoryMessageStore implements MessageStore private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>(); @Override - public StoreFuture commitTranAsync() + public FutureResult commitTranAsync() { - return StoreFuture.IMMEDIATE_FUTURE; + return FutureResult.IMMEDIATE_FUTURE; } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java index 6f7afccac0..007f3ab796 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.store; import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.util.FutureResult; public interface Transaction { @@ -53,7 +54,7 @@ public interface Transaction * Commits all operations performed within a given transactional context. * */ - StoreFuture commitTranAsync(); + FutureResult commitTranAsync(); /** * Abandons all operations performed within a given transactional context. @@ -72,4 +73,4 @@ public interface Transaction void recordXid(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues, Transaction.Record[] dequeues); -}
\ No newline at end of file +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index 65064b015c..809c234cc6 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -30,7 +30,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; @@ -55,7 +55,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction public static interface FutureRecorder { - public void recordFuture(StoreFuture future, Action action); + public void recordFuture(FutureResult future, Action action); } @@ -83,7 +83,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction */ public void addPostTransactionAction(final Action immediateAction) { - addFuture(StoreFuture.IMMEDIATE_FUTURE, immediateAction); + addFuture(FutureResult.IMMEDIATE_FUTURE, immediateAction); } @@ -92,7 +92,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction Transaction txn = null; try { - StoreFuture future; + FutureResult future; if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) @@ -108,7 +108,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = StoreFuture.IMMEDIATE_FUTURE; + future = FutureResult.IMMEDIATE_FUTURE; } addFuture(future, postTransactionAction); postTransactionAction = null; @@ -120,7 +120,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } - private void addFuture(final StoreFuture future, final Action action) + private void addFuture(final FutureResult future, final Action action) { if(action != null) { @@ -135,7 +135,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } - private void addEnqueueFuture(final StoreFuture future, final Action action, boolean persistent) + private void addEnqueueFuture(final FutureResult future, final Action action, boolean persistent) { if(action != null) { @@ -178,7 +178,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } - StoreFuture future; + FutureResult future; if(txn != null) { future = txn.commitTranAsync(); @@ -186,7 +186,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = StoreFuture.IMMEDIATE_FUTURE; + future = FutureResult.IMMEDIATE_FUTURE; } addFuture(future, postTransactionAction); postTransactionAction = null; @@ -204,7 +204,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction Transaction txn = null; try { - StoreFuture future; + FutureResult future; if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) @@ -219,7 +219,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = StoreFuture.IMMEDIATE_FUTURE; + future = FutureResult.IMMEDIATE_FUTURE; } addEnqueueFuture(future, postTransactionAction, message.isPersistent()); postTransactionAction = null; @@ -255,7 +255,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } - StoreFuture future; + FutureResult future; if (txn != null) { future = txn.commitTranAsync(); @@ -263,7 +263,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = StoreFuture.IMMEDIATE_FUTURE; + future = FutureResult.IMMEDIATE_FUTURE; } addEnqueueFuture(future, postTransactionAction, message.isPersistent()); postTransactionAction = null; @@ -281,7 +281,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction { if(immediatePostTransactionAction != null) { - addFuture(StoreFuture.IMMEDIATE_FUTURE, new Action() + addFuture(FutureResult.IMMEDIATE_FUTURE, new Action() { public void postCommit() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 349ec793fe..b800556312 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.txn; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +33,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; @@ -53,7 +54,7 @@ public class LocalTransaction implements ServerTransaction private final MessageStore _transactionLog; private volatile long _txnStartTime = 0L; private volatile long _txnUpdateTime = 0l; - private StoreFuture _asyncTran; + private FutureResult _asyncTran; public LocalTransaction(MessageStore transactionLog) { @@ -271,16 +272,16 @@ public class LocalTransaction implements ServerTransaction } } - public StoreFuture commitAsync(final Runnable deferred) + public FutureResult commitAsync(final Runnable deferred) { sync(); - StoreFuture future = StoreFuture.IMMEDIATE_FUTURE; + FutureResult future = FutureResult.IMMEDIATE_FUTURE; if(_transaction != null) { - future = new StoreFuture() + future = new FutureResult() { private volatile boolean _completed = false; - private StoreFuture _underlying = _transaction.commitTranAsync(); + private FutureResult _underlying = _transaction.commitTranAsync(); @Override public boolean isComplete() @@ -298,6 +299,17 @@ public class LocalTransaction implements ServerTransaction } } + @Override + public void waitForCompletion(final long timeout) throws TimeoutException + { + + if(!_completed) + { + _underlying.waitForCompletion(timeout); + checkUnderlyingCompletion(); + } + } + private synchronized boolean checkUnderlyingCompletion() { if(!_completed && _underlying.isComplete()) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreFuture.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FutureResult.java index 7d3bf90a75..2aab3081ee 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreFuture.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FutureResult.java @@ -18,11 +18,13 @@ * under the License. * */ -package org.apache.qpid.server.store; +package org.apache.qpid.server.util; -public interface StoreFuture +import java.util.concurrent.TimeoutException; + +public interface FutureResult { - StoreFuture IMMEDIATE_FUTURE = new StoreFuture() + FutureResult IMMEDIATE_FUTURE = new FutureResult() { public boolean isComplete() { @@ -32,9 +34,17 @@ public interface StoreFuture public void waitForCompletion() { } + + @Override + public void waitForCompletion(final long timeout) throws TimeoutException + { + + } }; boolean isComplete(); void waitForCompletion(); + + void waitForCompletion(long timeout) throws TimeoutException; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java index ec0908efba..a61ac4f5d2 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java @@ -27,7 +27,7 @@ import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction.FutureRecorder; import org.apache.qpid.server.txn.ServerTransaction.Action; @@ -43,7 +43,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase private MessageStore _messageStore = mock(MessageStore.class); private Transaction _storeTransaction = mock(Transaction.class); private Action _postTransactionAction = mock(Action.class); - private StoreFuture _future = mock(StoreFuture.class); + private FutureResult _future = mock(FutureResult.class); @Override @@ -136,7 +136,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction); verifyZeroInteractions(_storeTransaction); - verify(_futureRecorder).recordFuture(StoreFuture.IMMEDIATE_FUTURE, _postTransactionAction); + verify(_futureRecorder).recordFuture(FutureResult.IMMEDIATE_FUTURE, _postTransactionAction); verifyZeroInteractions(_postTransactionAction); } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java index da868a01f1..6fcfde0221 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java @@ -24,7 +24,7 @@ import org.apache.commons.lang.NotImplementedException; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.NullMessageStore; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.ServerScopedRuntimeException; @@ -96,7 +96,7 @@ class MockStoreTransaction implements Transaction _state = TransactionState.COMMITTED; } - public StoreFuture commitTranAsync() + public FutureResult commitTranAsync() { throw new NotImplementedException(); } @@ -126,4 +126,4 @@ class MockStoreTransaction implements Transaction } }; } -}
\ No newline at end of file +} diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 579c885053..50b957d066 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -75,7 +75,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AlreadyKnownDtxException; @@ -1012,17 +1012,17 @@ public class ServerSession extends Session return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast(); } - public void recordFuture(final StoreFuture future, final ServerTransaction.Action action) + public void recordFuture(final FutureResult future, final ServerTransaction.Action action) { _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } private static class AsyncCommand { - private final StoreFuture _future; + private final FutureResult _future; private ServerTransaction.Action _action; - public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action) + public AsyncCommand(final FutureResult future, final ServerTransaction.Action action) { _future = future; _action = action; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 95d54579a7..c42630d0c6 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -57,7 +57,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.DtxNotSelectedException; @@ -132,7 +132,7 @@ public class ServerSessionDelegate extends SessionDelegate serverSession.accept(method.getTransfers()); if(!serverSession.isTransactional()) { - serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, + serverSession.recordFuture(FutureResult.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, method)); } } @@ -433,7 +433,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, + serverSession.recordFuture(FutureResult.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr)); } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 8736bbeb3b..7c79e00c0b 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -97,7 +97,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; @@ -1782,7 +1782,7 @@ public class AMQChannel } } - public void recordFuture(final StoreFuture future, final ServerTransaction.Action action) + public void recordFuture(final FutureResult future, final ServerTransaction.Action action) { _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } @@ -1808,10 +1808,10 @@ public class AMQChannel private static class AsyncCommand { - private final StoreFuture _future; + private final FutureResult _future; private ServerTransaction.Action _action; - public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action) + public AsyncCommand(final FutureResult future, final ServerTransaction.Action action) { _future = future; _action = action; diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java index ce612ec0b6..63c60d7400 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java @@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.Transaction; public abstract class GenericAbstractJDBCMessageStore extends org.apache.qpid.server.store.AbstractJDBCMessageStore @@ -131,7 +131,7 @@ public abstract class GenericAbstractJDBCMessageStore extends org.apache.qpid.se } @Override - public StoreFuture commitTranAsync() + public FutureResult commitTranAsync() { try { diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java index c03dc4e1be..701c704fb6 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.util.StateChangeListener; class ManagementNodeConsumer implements ConsumerImpl @@ -122,9 +123,9 @@ class ManagementNodeConsumer implements ConsumerImpl } @Override - public void close() + public FutureResult close() { - + return FutureResult.IMMEDIATE_FUTURE; } @Override |
