summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java16
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java61
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java66
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java8
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java5
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java4
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java138
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java5
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java28
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java24
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FutureResult.java (renamed from qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreFuture.java)16
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java8
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java8
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java4
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java5
24 files changed, 342 insertions, 109 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index e8c337a578..be9248c0d2 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -51,7 +51,7 @@ import org.apache.qpid.server.store.EventManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.Xid;
@@ -924,14 +924,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore
*
* @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason.
*/
- private StoreFuture commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException
+ private FutureResult commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException
{
if (tx == null)
{
throw new StoreException("Fatal internal error: transactional is null at commitTran");
}
- StoreFuture result = getEnvironmentFacade().commit(tx, syncCommit);
+ FutureResult result = getEnvironmentFacade().commit(tx, syncCommit);
if (getLogger().isDebugEnabled())
{
@@ -1386,7 +1386,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- synchronized StoreFuture flushToStore()
+ synchronized FutureResult flushToStore()
{
if(!stored())
{
@@ -1407,7 +1407,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
storedSizeChangeOccurred(getMetaData().getContentSize());
}
- return StoreFuture.IMMEDIATE_FUTURE;
+ return FutureResult.IMMEDIATE_FUTURE;
}
@Override
@@ -1526,14 +1526,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
- public StoreFuture commitTranAsync() throws StoreException
+ public FutureResult commitTranAsync() throws StoreException
{
checkMessageStoreOpen();
doPreCommitActions();
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
- StoreFuture storeFuture = AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
+ FutureResult futureResult = AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
doPostCommitActions();
- return storeFuture;
+ return futureResult;
}
@Override
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
index 964335869d..2a8cf92b3d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.sleepycat.je.DatabaseException;
@@ -29,7 +30,7 @@ import com.sleepycat.je.Environment;
import com.sleepycat.je.Transaction;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
public class CoalescingCommiter implements Committer
{
@@ -65,16 +66,16 @@ public class CoalescingCommiter implements Committer
}
@Override
- public StoreFuture commit(Transaction tx, boolean syncCommit)
+ public FutureResult commit(Transaction tx, boolean syncCommit)
{
- BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+ BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx, syncCommit);
commitFuture.commit();
return commitFuture;
}
- private static final class BDBCommitFuture implements StoreFuture
+ private static final class BDBCommitFutureResult implements FutureResult
{
- private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class);
+ private static final Logger LOGGER = Logger.getLogger(BDBCommitFutureResult.class);
private final CommitThread _commitThread;
private final Transaction _tx;
@@ -82,7 +83,7 @@ public class CoalescingCommiter implements Committer
private RuntimeException _databaseException;
private boolean _complete;
- public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
+ public BDBCommitFutureResult(CommitThread commitThread, Transaction tx, boolean syncCommit)
{
_commitThread = commitThread;
_tx = tx;
@@ -162,13 +163,47 @@ public class CoalescingCommiter implements Committer
LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
}
}
+
+ public synchronized void waitForCompletion(long timeout) throws TimeoutException
+ {
+ long startTime= System.currentTimeMillis();
+ long remaining = timeout;
+
+ while (!isComplete() && remaining > 0)
+ {
+ _commitThread.explicitNotify();
+ try
+ {
+ wait(remaining);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ if(!isComplete())
+ {
+ remaining = (startTime + timeout) - System.currentTimeMillis();
+ }
+ }
+
+ if(remaining < 0l)
+ {
+ throw new TimeoutException("commit did not occur within given timeout period: " + timeout);
+ }
+
+ if(LOGGER.isDebugEnabled())
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
+ }
+ }
}
/**
- * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
+ * Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult} operations. The commit operations
* themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
* continuing, but it is the responsibility of this thread to tell the commit operations when they have been
- * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
+ * completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#abort} methods.
*
* <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
*/
@@ -177,7 +212,7 @@ public class CoalescingCommiter implements Committer
private static final Logger LOGGER = Logger.getLogger(CommitThread.class);
private final AtomicBoolean _stopped = new AtomicBoolean(false);
- private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
+ private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>();
private final Object _lock = new Object();
private final EnvironmentFacade _environmentFacade;
@@ -244,7 +279,7 @@ public class CoalescingCommiter implements Committer
for(int i = 0; i < size; i++)
{
- BDBCommitFuture commit = _jobQueue.poll();
+ BDBCommitFutureResult commit = _jobQueue.poll();
if (commit == null)
{
break;
@@ -261,7 +296,7 @@ public class CoalescingCommiter implements Committer
for(int i = 0; i < size; i++)
{
- BDBCommitFuture commit = _jobQueue.poll();
+ BDBCommitFutureResult commit = _jobQueue.poll();
if (commit == null)
{
break;
@@ -290,7 +325,7 @@ public class CoalescingCommiter implements Committer
return !_jobQueue.isEmpty();
}
- public void addJob(BDBCommitFuture commit, final boolean sync)
+ public void addJob(BDBCommitFutureResult commit, final boolean sync)
{
if (_stopped.get())
{
@@ -313,7 +348,7 @@ public class CoalescingCommiter implements Committer
{
_stopped.set(true);
Environment environment = _environmentFacade.getEnvironment();
- BDBCommitFuture commit;
+ BDBCommitFutureResult commit;
if (environment != null && environment.isValid())
{
environment.flushLog(true);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
index 1f05dca41a..133a0ee7d9 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
@@ -22,16 +22,17 @@ package org.apache.qpid.server.store.berkeleydb;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
-
import com.sleepycat.je.CheckpointConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.Transaction;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.util.FutureResult;
public class CommitThreadWrapper
{
@@ -53,16 +54,16 @@ public class CommitThreadWrapper
_commitThread.join();
}
- public StoreFuture commit(Transaction tx, boolean syncCommit)
+ public FutureResult commit(Transaction tx, boolean syncCommit)
{
- BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+ BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx, syncCommit);
commitFuture.commit();
return commitFuture;
}
- private static final class BDBCommitFuture implements StoreFuture
+ private static final class BDBCommitFutureResult implements FutureResult
{
- private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class);
+ private static final Logger LOGGER = Logger.getLogger(BDBCommitFutureResult.class);
private final CommitThread _commitThread;
private final Transaction _tx;
@@ -70,7 +71,7 @@ public class CommitThreadWrapper
private boolean _complete;
private boolean _syncCommit;
- public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
+ public BDBCommitFutureResult(CommitThread commitThread, Transaction tx, boolean syncCommit)
{
_commitThread = commitThread;
_tx = tx;
@@ -150,13 +151,48 @@ public class CommitThreadWrapper
LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
}
}
+
+ @Override
+ public void waitForCompletion(final long timeout) throws TimeoutException
+ {
+ long startTime = System.currentTimeMillis();
+ long remaining = timeout;
+
+ while (!isComplete() && remaining > 0)
+ {
+ _commitThread.explicitNotify();
+ try
+ {
+ wait(remaining);
+ }
+ catch (InterruptedException e)
+ {
+ throw new StoreException(e);
+ }
+ if(!isComplete())
+ {
+ remaining = (startTime + timeout) - System.currentTimeMillis();
+ }
+ }
+
+ if(remaining < 0)
+ {
+ throw new TimeoutException("Commit did not complete within required timeout: " + timeout);
+ }
+
+ if(LOGGER.isDebugEnabled())
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
+ }
+ }
}
/**
- * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
+ * Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult} operations. The commit operations
* themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
* continuing, but it is the responsibility of this thread to tell the commit operations when they have been
- * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
+ * completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#abort} methods.
*
* <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
*/
@@ -165,7 +201,7 @@ public class CommitThreadWrapper
private static final Logger LOGGER = Logger.getLogger(CommitThread.class);
private final AtomicBoolean _stopped = new AtomicBoolean(false);
- private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
+ private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>();
private final CheckpointConfig _config = new CheckpointConfig();
private final Object _lock = new Object();
private Environment _environment;
@@ -230,7 +266,7 @@ public class CommitThreadWrapper
for(int i = 0; i < size; i++)
{
- BDBCommitFuture commit = _jobQueue.poll();
+ BDBCommitFutureResult commit = _jobQueue.poll();
commit.complete();
}
@@ -243,7 +279,7 @@ public class CommitThreadWrapper
for(int i = 0; i < size; i++)
{
- BDBCommitFuture commit = _jobQueue.poll();
+ BDBCommitFutureResult commit = _jobQueue.poll();
commit.abort(e);
}
}
@@ -268,7 +304,7 @@ public class CommitThreadWrapper
return !_jobQueue.isEmpty();
}
- public void addJob(BDBCommitFuture commit, final boolean sync)
+ public void addJob(BDBCommitFutureResult commit, final boolean sync)
{
_jobQueue.add(commit);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
index 01e45d8ac5..9bd1aaf3e0 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
@@ -20,15 +20,15 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import org.apache.qpid.server.store.StoreFuture;
-
import com.sleepycat.je.Transaction;
+import org.apache.qpid.server.util.FutureResult;
+
public interface Committer
{
void start();
- StoreFuture commit(Transaction tx, boolean syncCommit);
+ FutureResult commit(Transaction tx, boolean syncCommit);
void stop();
-} \ No newline at end of file
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
index a42bc43a5e..e3969c467c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
@@ -27,14 +27,13 @@ import java.util.Map;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.Sequence;
import com.sleepycat.je.SequenceConfig;
import com.sleepycat.je.Transaction;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
public interface EnvironmentFacade
{
@@ -55,7 +54,7 @@ public interface EnvironmentFacade
Transaction beginTransaction();
- StoreFuture commit(com.sleepycat.je.Transaction tx, boolean sync);
+ FutureResult commit(com.sleepycat.je.Transaction tx, boolean sync);
RuntimeException handleDatabaseException(String contextMessage, RuntimeException e);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index f3a06db89c..eff652ce05 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -38,7 +38,7 @@ import com.sleepycat.je.Transaction;
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.berkeleydb.logging.Log4jLoggingHandler;
public class StandardEnvironmentFacade implements EnvironmentFacade
@@ -127,7 +127,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
}
@Override
- public StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit)
+ public FutureResult commit(com.sleepycat.je.Transaction tx, boolean syncCommit)
{
try
{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index c151a594bf..d6dff430ad 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -73,7 +73,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.berkeleydb.BDBUtils;
import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
import org.apache.qpid.server.store.berkeleydb.EnvHomeRegistry;
@@ -265,7 +265,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
@Override
- public StoreFuture commit(final Transaction tx, boolean syncCommit)
+ public FutureResult commit(final Transaction tx, boolean syncCommit)
{
try
{
@@ -283,7 +283,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
return _coalescingCommiter.commit(tx, syncCommit);
}
- return StoreFuture.IMMEDIATE_FUTURE;
+ return FutureResult.IMMEDIATE_FUTURE;
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
index c18923ffe0..6c50fe7cfd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
@@ -26,6 +26,7 @@ import java.io.InputStream;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.Properties;
+import java.util.concurrent.TimeoutException;
import javax.security.auth.Subject;
@@ -49,6 +50,7 @@ import org.apache.qpid.server.plugin.PluggableFactoryLoader;
import org.apache.qpid.server.plugin.SystemConfigFactory;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.FutureResult;
public class Broker implements BrokerShutdownProvider
{
@@ -102,11 +104,16 @@ public class Broker implements BrokerShutdownProvider
{
if(_systemConfig != null)
{
- _systemConfig.close();
+ final FutureResult closeResult = _systemConfig.close();
+ closeResult.waitForCompletion(5000l);
}
_taskExecutor.stop();
}
+ catch (TimeoutException e)
+ {
+ LOGGER.warn("Attempting to cleanly shutdown took too long, exiting immediately");
+ }
finally
{
if (_configuringOwnLogging)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
index fd6f3385c6..52fcf07e25 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.util.FutureResult;
public interface ConsumerImpl
{
@@ -65,7 +66,7 @@ public interface ConsumerImpl
boolean seesRequeues();
- void close();
+ FutureResult close();
boolean trySendLock();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index baf465f6d1..aef769dc4f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -43,6 +43,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
@@ -68,6 +69,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.encryption.ConfigurationSecretEncrypter;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.util.Strings;
@@ -457,14 +459,15 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
- protected void closeChildren()
+ protected FutureResult closeChildren()
{
+ final List<FutureResult> childCloseFutures = new ArrayList<>();
applyToChildren(new Action<ConfiguredObject<?>>()
{
@Override
public void performAction(final ConfiguredObject<?> child)
{
- child.close();
+ childCloseFutures.add(child.close());
}
});
@@ -483,13 +486,67 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
childNameMap.clear();
}
+
+ FutureResult futureResult;
+ if(childCloseFutures.isEmpty())
+ {
+ futureResult = FutureResult.IMMEDIATE_FUTURE;
+ }
+ else
+ {
+ futureResult = new FutureResult()
+ {
+ @Override
+ public boolean isComplete()
+ {
+ for(FutureResult childResult : childCloseFutures)
+ {
+ if(!childResult.isComplete())
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void waitForCompletion()
+ {
+ for(FutureResult childResult : childCloseFutures)
+ {
+ childResult.waitForCompletion();
+ }
+ }
+
+
+ @Override
+ public void waitForCompletion(long timeout) throws TimeoutException
+ {
+ long startTime = System.currentTimeMillis();
+ long remaining = timeout;
+ for(FutureResult childResult : childCloseFutures)
+ {
+
+ childResult.waitForCompletion(remaining);
+ remaining = startTime + timeout - System.currentTimeMillis();
+ if(remaining < 0)
+ {
+ throw new TimeoutException("Completion did not occur within specified timeout: " + timeout);
+ }
+ }
+ }
+ };
+ }
+ return futureResult;
}
@Override
- public final void close()
+ public final FutureResult close()
{
if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
{
+ final CloseResult closeResult = new CloseResult();
+
CloseFuture close = beforeClose();
Runnable closeRunnable = new Runnable()
@@ -497,7 +554,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
@Override
public void run()
{
- closeChildren();
+ final FutureResult result = closeChildren();
+ closeResult.setChildFutureResult(result);
onClose();
unregister(false);
@@ -514,7 +572,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
// if future not complete, schedule the remainder to be done once complete.
-
+ return closeResult;
+ }
+ else
+ {
+ return FutureResult.IMMEDIATE_FUTURE;
}
}
@@ -1899,6 +1961,72 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
+ private static class CloseResult implements FutureResult
+ {
+ private volatile FutureResult _childFutureResult;
+
+ @Override
+ public boolean isComplete()
+ {
+ return _childFutureResult != null && _childFutureResult.isComplete();
+ }
+
+ @Override
+ public void waitForCompletion()
+ {
+ synchronized (this)
+ {
+ while (_childFutureResult == null)
+ {
+ try
+ {
+ wait();
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+ }
+ }
+ _childFutureResult.waitForCompletion();
+ }
+
+ @Override
+ public void waitForCompletion(final long timeout) throws TimeoutException
+ {
+ long startTime = System.currentTimeMillis();
+ long remaining = timeout;
+
+ synchronized (this)
+ {
+ while (_childFutureResult == null && remaining > 0)
+ {
+ try
+ {
+ wait(remaining);
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+ remaining = startTime + timeout - System.currentTimeMillis();
+
+ if(remaining < 0)
+ {
+ throw new TimeoutException("Completion did not occur within given tiemout: " + timeout);
+ }
+ }
+ }
+ _childFutureResult.waitForCompletion(remaining);
+ }
+
+ public synchronized void setChildFutureResult(final FutureResult childFutureResult)
+ {
+ _childFutureResult = childFutureResult;
+ notifyAll();
+ }
+ }
+
private class AttributeGettingHandler implements InvocationHandler
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
index 7079461a09..89fda6798b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.util.FutureResult;
@ManagedObject( creatable = false, category = false )
/**
@@ -246,7 +247,7 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>>
void open();
- void close();
+ FutureResult close();
TaskExecutor getTaskExecutor();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 4dfaa716cf..5868ae61c5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -45,6 +45,7 @@ import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.server.util.FutureResult;
public abstract class AbstractJDBCMessageStore implements MessageStore
{
@@ -834,10 +835,10 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
}
- private StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws StoreException
+ private FutureResult commitTranAsync(ConnectionWrapper connWrapper) throws StoreException
{
commitTran(connWrapper);
- return StoreFuture.IMMEDIATE_FUTURE;
+ return FutureResult.IMMEDIATE_FUTURE;
}
private void abortTran(ConnectionWrapper connWrapper) throws StoreException
@@ -1231,14 +1232,14 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
@Override
- public StoreFuture commitTranAsync()
+ public FutureResult commitTranAsync()
{
checkMessageStoreOpen();
doPreCommitActions();
- StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
+ FutureResult futureResult = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
storedSizeChange(_storeSizeIncrease);
doPostCommitActions();
- return storeFuture;
+ return futureResult;
}
private void doPreCommitActions()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index efe040fbb3..eb887b4ef5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.server.util.FutureResult;
/** A simple message store that stores the messages in a thread-safe structure in memory. */
public class MemoryMessageStore implements MessageStore
@@ -58,9 +59,9 @@ public class MemoryMessageStore implements MessageStore
private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>();
@Override
- public StoreFuture commitTranAsync()
+ public FutureResult commitTranAsync()
{
- return StoreFuture.IMMEDIATE_FUTURE;
+ return FutureResult.IMMEDIATE_FUTURE;
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
index 6f7afccac0..007f3ab796 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.util.FutureResult;
public interface Transaction
{
@@ -53,7 +54,7 @@ public interface Transaction
* Commits all operations performed within a given transactional context.
*
*/
- StoreFuture commitTranAsync();
+ FutureResult commitTranAsync();
/**
* Abandons all operations performed within a given transactional context.
@@ -72,4 +73,4 @@ public interface Transaction
void recordXid(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues,
Transaction.Record[] dequeues);
-} \ No newline at end of file
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
index 65064b015c..809c234cc6 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
@@ -30,7 +30,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -55,7 +55,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
public static interface FutureRecorder
{
- public void recordFuture(StoreFuture future, Action action);
+ public void recordFuture(FutureResult future, Action action);
}
@@ -83,7 +83,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
*/
public void addPostTransactionAction(final Action immediateAction)
{
- addFuture(StoreFuture.IMMEDIATE_FUTURE, immediateAction);
+ addFuture(FutureResult.IMMEDIATE_FUTURE, immediateAction);
}
@@ -92,7 +92,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
Transaction txn = null;
try
{
- StoreFuture future;
+ FutureResult future;
if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
@@ -108,7 +108,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
else
{
- future = StoreFuture.IMMEDIATE_FUTURE;
+ future = FutureResult.IMMEDIATE_FUTURE;
}
addFuture(future, postTransactionAction);
postTransactionAction = null;
@@ -120,7 +120,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
- private void addFuture(final StoreFuture future, final Action action)
+ private void addFuture(final FutureResult future, final Action action)
{
if(action != null)
{
@@ -135,7 +135,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
}
- private void addEnqueueFuture(final StoreFuture future, final Action action, boolean persistent)
+ private void addEnqueueFuture(final FutureResult future, final Action action, boolean persistent)
{
if(action != null)
{
@@ -178,7 +178,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
}
- StoreFuture future;
+ FutureResult future;
if(txn != null)
{
future = txn.commitTranAsync();
@@ -186,7 +186,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
else
{
- future = StoreFuture.IMMEDIATE_FUTURE;
+ future = FutureResult.IMMEDIATE_FUTURE;
}
addFuture(future, postTransactionAction);
postTransactionAction = null;
@@ -204,7 +204,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
Transaction txn = null;
try
{
- StoreFuture future;
+ FutureResult future;
if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
@@ -219,7 +219,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
else
{
- future = StoreFuture.IMMEDIATE_FUTURE;
+ future = FutureResult.IMMEDIATE_FUTURE;
}
addEnqueueFuture(future, postTransactionAction, message.isPersistent());
postTransactionAction = null;
@@ -255,7 +255,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
}
- StoreFuture future;
+ FutureResult future;
if (txn != null)
{
future = txn.commitTranAsync();
@@ -263,7 +263,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
else
{
- future = StoreFuture.IMMEDIATE_FUTURE;
+ future = FutureResult.IMMEDIATE_FUTURE;
}
addEnqueueFuture(future, postTransactionAction, message.isPersistent());
postTransactionAction = null;
@@ -281,7 +281,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
{
if(immediatePostTransactionAction != null)
{
- addFuture(StoreFuture.IMMEDIATE_FUTURE, new Action()
+ addFuture(FutureResult.IMMEDIATE_FUTURE, new Action()
{
public void postCommit()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 349ec793fe..b800556312 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.txn;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +33,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -53,7 +54,7 @@ public class LocalTransaction implements ServerTransaction
private final MessageStore _transactionLog;
private volatile long _txnStartTime = 0L;
private volatile long _txnUpdateTime = 0l;
- private StoreFuture _asyncTran;
+ private FutureResult _asyncTran;
public LocalTransaction(MessageStore transactionLog)
{
@@ -271,16 +272,16 @@ public class LocalTransaction implements ServerTransaction
}
}
- public StoreFuture commitAsync(final Runnable deferred)
+ public FutureResult commitAsync(final Runnable deferred)
{
sync();
- StoreFuture future = StoreFuture.IMMEDIATE_FUTURE;
+ FutureResult future = FutureResult.IMMEDIATE_FUTURE;
if(_transaction != null)
{
- future = new StoreFuture()
+ future = new FutureResult()
{
private volatile boolean _completed = false;
- private StoreFuture _underlying = _transaction.commitTranAsync();
+ private FutureResult _underlying = _transaction.commitTranAsync();
@Override
public boolean isComplete()
@@ -298,6 +299,17 @@ public class LocalTransaction implements ServerTransaction
}
}
+ @Override
+ public void waitForCompletion(final long timeout) throws TimeoutException
+ {
+
+ if(!_completed)
+ {
+ _underlying.waitForCompletion(timeout);
+ checkUnderlyingCompletion();
+ }
+ }
+
private synchronized boolean checkUnderlyingCompletion()
{
if(!_completed && _underlying.isComplete())
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreFuture.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FutureResult.java
index 7d3bf90a75..2aab3081ee 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreFuture.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FutureResult.java
@@ -18,11 +18,13 @@
* under the License.
*
*/
-package org.apache.qpid.server.store;
+package org.apache.qpid.server.util;
-public interface StoreFuture
+import java.util.concurrent.TimeoutException;
+
+public interface FutureResult
{
- StoreFuture IMMEDIATE_FUTURE = new StoreFuture()
+ FutureResult IMMEDIATE_FUTURE = new FutureResult()
{
public boolean isComplete()
{
@@ -32,9 +34,17 @@ public interface StoreFuture
public void waitForCompletion()
{
}
+
+ @Override
+ public void waitForCompletion(final long timeout) throws TimeoutException
+ {
+
+ }
};
boolean isComplete();
void waitForCompletion();
+
+ void waitForCompletion(long timeout) throws TimeoutException;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
index ec0908efba..a61ac4f5d2 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
@@ -27,7 +27,7 @@ import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction.FutureRecorder;
import org.apache.qpid.server.txn.ServerTransaction.Action;
@@ -43,7 +43,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase
private MessageStore _messageStore = mock(MessageStore.class);
private Transaction _storeTransaction = mock(Transaction.class);
private Action _postTransactionAction = mock(Action.class);
- private StoreFuture _future = mock(StoreFuture.class);
+ private FutureResult _future = mock(FutureResult.class);
@Override
@@ -136,7 +136,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase
asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction);
verifyZeroInteractions(_storeTransaction);
- verify(_futureRecorder).recordFuture(StoreFuture.IMMEDIATE_FUTURE, _postTransactionAction);
+ verify(_futureRecorder).recordFuture(FutureResult.IMMEDIATE_FUTURE, _postTransactionAction);
verifyZeroInteractions(_postTransactionAction);
}
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
index da868a01f1..6fcfde0221 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
@@ -24,7 +24,7 @@ import org.apache.commons.lang.NotImplementedException;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.NullMessageStore;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -96,7 +96,7 @@ class MockStoreTransaction implements Transaction
_state = TransactionState.COMMITTED;
}
- public StoreFuture commitTranAsync()
+ public FutureResult commitTranAsync()
{
throw new NotImplementedException();
}
@@ -126,4 +126,4 @@ class MockStoreTransaction implements Transaction
}
};
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 579c885053..50b957d066 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -75,7 +75,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
@@ -1012,17 +1012,17 @@ public class ServerSession extends Session
return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast();
}
- public void recordFuture(final StoreFuture future, final ServerTransaction.Action action)
+ public void recordFuture(final FutureResult future, final ServerTransaction.Action action)
{
_unfinishedCommandsQueue.add(new AsyncCommand(future, action));
}
private static class AsyncCommand
{
- private final StoreFuture _future;
+ private final FutureResult _future;
private ServerTransaction.Action _action;
- public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action)
+ public AsyncCommand(final FutureResult future, final ServerTransaction.Action action)
{
_future = future;
_action = action;
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 95d54579a7..c42630d0c6 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -57,7 +57,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.DtxNotSelectedException;
@@ -132,7 +132,7 @@ public class ServerSessionDelegate extends SessionDelegate
serverSession.accept(method.getTransfers());
if(!serverSession.isTransactional())
{
- serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE,
+ serverSession.recordFuture(FutureResult.IMMEDIATE_FUTURE,
new CommandProcessedAction(serverSession, method));
}
}
@@ -433,7 +433,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE,
+ serverSession.recordFuture(FutureResult.IMMEDIATE_FUTURE,
new CommandProcessedAction(serverSession, xfr));
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 8736bbeb3b..7c79e00c0b 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -97,7 +97,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
@@ -1782,7 +1782,7 @@ public class AMQChannel
}
}
- public void recordFuture(final StoreFuture future, final ServerTransaction.Action action)
+ public void recordFuture(final FutureResult future, final ServerTransaction.Action action)
{
_unfinishedCommandsQueue.add(new AsyncCommand(future, action));
}
@@ -1808,10 +1808,10 @@ public class AMQChannel
private static class AsyncCommand
{
- private final StoreFuture _future;
+ private final FutureResult _future;
private ServerTransaction.Action _action;
- public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action)
+ public AsyncCommand(final FutureResult future, final ServerTransaction.Action action)
{
_future = future;
_action = action;
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
index ce612ec0b6..63c60d7400 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.Transaction;
public abstract class GenericAbstractJDBCMessageStore extends org.apache.qpid.server.store.AbstractJDBCMessageStore
@@ -131,7 +131,7 @@ public abstract class GenericAbstractJDBCMessageStore extends org.apache.qpid.se
}
@Override
- public StoreFuture commitTranAsync()
+ public FutureResult commitTranAsync()
{
try
{
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index c03dc4e1be..701c704fb6 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.util.StateChangeListener;
class ManagementNodeConsumer implements ConsumerImpl
@@ -122,9 +123,9 @@ class ManagementNodeConsumer implements ConsumerImpl
}
@Override
- public void close()
+ public FutureResult close()
{
-
+ return FutureResult.IMMEDIATE_FUTURE;
}
@Override