diff options
Diffstat (limited to 'java')
3 files changed, 47 insertions, 16 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 42ccee3b3f..7393b17243 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -336,6 +336,12 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo } + @Override + protected void awaitClose() + { + // Broker shouldn't block awaiting close - thus do override this method to do nothing + } + public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry) { _transaction.dequeue(entry.getQueue(), entry.getMessage(), diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 73eebec7bc..b4bb6eb0b4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -101,11 +101,14 @@ public class ServerSessionDelegate extends SessionDelegate public void command(Session session, Method method) { SecurityManager.setThreadPrincipal(session.getConnection().getAuthorizationID()); - - super.command(session, method); - if (method.isSync()) + + if(!session.isClosing()) { - session.flushProcessed(); + super.command(session, method); + if (method.isSync()) + { + session.flushProcessed(); + } } } @@ -1189,6 +1192,12 @@ public class ServerSessionDelegate extends SessionDelegate ((ServerSession)session).onClose(); } + @Override + public void detached(Session session) + { + closed(session); + } + public Collection<Subscription_0_10> getSubscriptions(Session session) { return ((ServerSession)session).getSubscriptions(); diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 9b84ff422b..e989849477 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -543,7 +543,7 @@ public class Session extends SessionInvoker } } - if (state != OPEN && state != CLOSED) + if (state != OPEN && state != CLOSED && state != CLOSING) { Thread current = Thread.currentThread(); if (!current.equals(resumer)) @@ -568,6 +568,7 @@ public class Session extends SessionInvoker ("timed out waiting for resume to finish"); } break; + case CLOSING: case CLOSED: ExecutionException exc = getException(); if (exc != null) @@ -906,18 +907,23 @@ public class Session extends SessionInvoker sessionRequestTimeout(0); sessionDetach(name.getBytes()); - Waiter w = new Waiter(commands, timeout); - while (w.hasTime() && state != CLOSED) - { - w.await(); - } - - if (state != CLOSED) - { - throw new SessionException("close() timed out"); - } + awaitClose(); - connection.removeSession(this); + + } + } + + protected void awaitClose() + { + Waiter w = new Waiter(commands, timeout); + while (w.hasTime() && state != CLOSED) + { + w.await(); + } + + if (state != CLOSED) + { + throw new SessionException("close() timed out"); } } @@ -960,6 +966,16 @@ public class Session extends SessionInvoker delegate.detached(this); } } + + if(state == CLOSED) + { + connection.removeSession(this); + } + } + + public boolean isClosing() + { + return state == CLOSED || state == CLOSING; } public String toString() |
