diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-02-19 17:32:33 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-02-19 17:32:33 +0000 |
| commit | 67a9864ed236ed085e263beaea7bae2c52522331 (patch) | |
| tree | 7b68eea9dfab1c7f958b68541a00dfa17da3a334 /qpid/java/broker-plugins | |
| parent | db70f1d2908f294fee0ed47cdb478c3ab0f3b252 (diff) | |
| download | qpid-python-67a9864ed236ed085e263beaea7bae2c52522331.tar.gz | |
Make close return a future, wait on Future in broker shutdown
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1660949 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
5 files changed, 16 insertions, 15 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 579c885053..50b957d066 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -75,7 +75,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AlreadyKnownDtxException; @@ -1012,17 +1012,17 @@ public class ServerSession extends Session return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast(); } - public void recordFuture(final StoreFuture future, final ServerTransaction.Action action) + public void recordFuture(final FutureResult future, final ServerTransaction.Action action) { _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } private static class AsyncCommand { - private final StoreFuture _future; + private final FutureResult _future; private ServerTransaction.Action _action; - public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action) + public AsyncCommand(final FutureResult future, final ServerTransaction.Action action) { _future = future; _action = action; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 95d54579a7..c42630d0c6 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -57,7 +57,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.DtxNotSelectedException; @@ -132,7 +132,7 @@ public class ServerSessionDelegate extends SessionDelegate serverSession.accept(method.getTransfers()); if(!serverSession.isTransactional()) { - serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, + serverSession.recordFuture(FutureResult.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, method)); } } @@ -433,7 +433,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, + serverSession.recordFuture(FutureResult.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr)); } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 8736bbeb3b..7c79e00c0b 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -97,7 +97,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; @@ -1782,7 +1782,7 @@ public class AMQChannel } } - public void recordFuture(final StoreFuture future, final ServerTransaction.Action action) + public void recordFuture(final FutureResult future, final ServerTransaction.Action action) { _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } @@ -1808,10 +1808,10 @@ public class AMQChannel private static class AsyncCommand { - private final StoreFuture _future; + private final FutureResult _future; private ServerTransaction.Action _action; - public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action) + public AsyncCommand(final FutureResult future, final ServerTransaction.Action action) { _future = future; _action = action; diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java index ce612ec0b6..63c60d7400 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java @@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.Transaction; public abstract class GenericAbstractJDBCMessageStore extends org.apache.qpid.server.store.AbstractJDBCMessageStore @@ -131,7 +131,7 @@ public abstract class GenericAbstractJDBCMessageStore extends org.apache.qpid.se } @Override - public StoreFuture commitTranAsync() + public FutureResult commitTranAsync() { try { diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java index c03dc4e1be..701c704fb6 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.util.StateChangeListener; class ManagementNodeConsumer implements ConsumerImpl @@ -122,9 +123,9 @@ class ManagementNodeConsumer implements ConsumerImpl } @Override - public void close() + public FutureResult close() { - + return FutureResult.IMMEDIATE_FUTURE; } @Override |
