diff options
Diffstat (limited to 'qpid/java')
2 files changed, 39 insertions, 77 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index ef760ade6a..0b7be2c28a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import org.slf4j.Logger; @@ -34,7 +35,6 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.AbstractConfiguredObject; -import org.apache.qpid.server.model.CloseFuture; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.Port; @@ -54,8 +54,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection private final Action _underlyingConnectionDeleteTask; private final AtomicBoolean _underlyingClosed = new AtomicBoolean(false); - private AMQConnectionModel _underlyingConnection; - private final AtomicBoolean _closing = new AtomicBoolean(); + private final AMQConnectionModel _underlyingConnection; public ConnectionAdapter(final AMQConnectionModel conn) { @@ -70,7 +69,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection { conn.removeDeleteTask(this); _underlyingClosed.set(true); - deleted(); + deleteAsync(); } }; conn.addDeleteTask(_underlyingConnectionDeleteTask); @@ -163,38 +162,52 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED) private ListenableFuture<Void> doDelete() { - final SettableFuture<Void> returnVal = SettableFuture.create(); - asyncClose().addListener( - new Runnable() - { - @Override - public void run() + if (_underlyingClosed.get()) + { + deleted(); + return Futures.immediateFuture(null); + } + else + { + final SettableFuture<Void> returnVal = SettableFuture.create(); + asyncCloseUnderlying().addListener( + new Runnable() { - try - { - deleted(); - setState(State.DELETED); - } - finally + @Override + public void run() { - returnVal.set(null); + try + { + deleted(); + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } } - } - }, getTaskExecutor().getExecutor() - ); - return returnVal; + }, getTaskExecutor().getExecutor() + ); + return returnVal; + } } @Override protected ListenableFuture<Void> beforeClose() { - _closing.set(true); + if (_underlyingClosed.get()) + { + return Futures.immediateFuture(null); + } + else + { - return asyncClose(); + return asyncCloseUnderlying(); + } } - private ListenableFuture<Void> asyncClose() + private ListenableFuture<Void> asyncCloseUnderlying() { final SettableFuture<Void> closeFuture = SettableFuture.create(); @@ -206,6 +219,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection closeFuture.set(null); } }); + _underlyingConnection.removeDeleteTask(_underlyingConnectionDeleteTask); _underlyingConnection.closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); return closeFuture; @@ -279,56 +293,4 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection { // SessionAdapter installs delete task to cause session model object to delete } - - - private static class ConnectionCloseFuture implements CloseFuture - { - private boolean _closed; - - public synchronized void connectionClosed() - { - _closed = true; - notifyAll(); - - } - - @Override - public void runWhenComplete(final Runnable closeRunnable) - { - if (_closed ) - { - closeRunnable.run(); - } - else - { - Thread t = new Thread(new Runnable() - { - @Override - public void run() - { - synchronized (ConnectionCloseFuture.this) - { - while (!_closed) - { - try - { - ConnectionCloseFuture.this.wait(); - } - catch (InterruptedException e) - { - } - } - - closeRunnable.run(); - } - } - }); - - t.setDaemon(true); - t.start(); - - } - } - } - } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java index cb412e8d41..1a2a3431ad 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java @@ -73,7 +73,7 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl public void performAction(final Object object) { session.removeDeleteTask(this); - deleted(); + deleteAsync(); } }); setState(State.ACTIVE); |
