summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java114
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java2
2 files changed, 39 insertions, 77 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
index ef760ade6a..0b7be2c28a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
@@ -34,7 +35,6 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.AbstractConfiguredObject;
-import org.apache.qpid.server.model.CloseFuture;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Port;
@@ -54,8 +54,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
private final Action _underlyingConnectionDeleteTask;
private final AtomicBoolean _underlyingClosed = new AtomicBoolean(false);
- private AMQConnectionModel _underlyingConnection;
- private final AtomicBoolean _closing = new AtomicBoolean();
+ private final AMQConnectionModel _underlyingConnection;
public ConnectionAdapter(final AMQConnectionModel conn)
{
@@ -70,7 +69,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
{
conn.removeDeleteTask(this);
_underlyingClosed.set(true);
- deleted();
+ deleteAsync();
}
};
conn.addDeleteTask(_underlyingConnectionDeleteTask);
@@ -163,38 +162,52 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
@StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED)
private ListenableFuture<Void> doDelete()
{
- final SettableFuture<Void> returnVal = SettableFuture.create();
- asyncClose().addListener(
- new Runnable()
- {
- @Override
- public void run()
+ if (_underlyingClosed.get())
+ {
+ deleted();
+ return Futures.immediateFuture(null);
+ }
+ else
+ {
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ asyncCloseUnderlying().addListener(
+ new Runnable()
{
- try
- {
- deleted();
- setState(State.DELETED);
- }
- finally
+ @Override
+ public void run()
{
- returnVal.set(null);
+ try
+ {
+ deleted();
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
}
- }
- }, getTaskExecutor().getExecutor()
- );
- return returnVal;
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
+ }
}
@Override
protected ListenableFuture<Void> beforeClose()
{
- _closing.set(true);
+ if (_underlyingClosed.get())
+ {
+ return Futures.immediateFuture(null);
+ }
+ else
+ {
- return asyncClose();
+ return asyncCloseUnderlying();
+ }
}
- private ListenableFuture<Void> asyncClose()
+ private ListenableFuture<Void> asyncCloseUnderlying()
{
final SettableFuture<Void> closeFuture = SettableFuture.create();
@@ -206,6 +219,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
closeFuture.set(null);
}
});
+ _underlyingConnection.removeDeleteTask(_underlyingConnectionDeleteTask);
_underlyingConnection.closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
return closeFuture;
@@ -279,56 +293,4 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
{
// SessionAdapter installs delete task to cause session model object to delete
}
-
-
- private static class ConnectionCloseFuture implements CloseFuture
- {
- private boolean _closed;
-
- public synchronized void connectionClosed()
- {
- _closed = true;
- notifyAll();
-
- }
-
- @Override
- public void runWhenComplete(final Runnable closeRunnable)
- {
- if (_closed )
- {
- closeRunnable.run();
- }
- else
- {
- Thread t = new Thread(new Runnable()
- {
- @Override
- public void run()
- {
- synchronized (ConnectionCloseFuture.this)
- {
- while (!_closed)
- {
- try
- {
- ConnectionCloseFuture.this.wait();
- }
- catch (InterruptedException e)
- {
- }
- }
-
- closeRunnable.run();
- }
- }
- });
-
- t.setDaemon(true);
- t.start();
-
- }
- }
- }
-
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
index cb412e8d41..1a2a3431ad 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
@@ -73,7 +73,7 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl
public void performAction(final Object object)
{
session.removeDeleteTask(this);
- deleted();
+ deleteAsync();
}
});
setState(State.ACTIVE);