diff options
| author | Keith Wall <kwall@apache.org> | 2011-08-25 10:46:39 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2011-08-25 10:46:39 +0000 |
| commit | f9f222d00f57c52ccdb526b4bd7e991a05755fb3 (patch) | |
| tree | 2b4b508f10f0fb14a5e8283cf6ddfeccf1d7a3d6 /java | |
| parent | dd48476d3eacf9b4d084604942956f8e29e9ddf6 (diff) | |
| download | qpid-python-f9f222d00f57c52ccdb526b4bd7e991a05755fb3.tar.gz | |
QPID-3452: Broker now unregisters any remaining subscriptions on receipt of SessionDetach to prevent SubFlushRunner and QueueRunner sending erroneous frames causing a ProtocolViolationException on the client.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1161492 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java | 21 | ||||
| -rw-r--r-- | java/common/src/main/java/org/apache/qpid/transport/Connection.java | 5 |
2 files changed, 23 insertions, 3 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index f65cad23e7..b3acf48676 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.transport; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -36,6 +37,7 @@ import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.AuthenticationResult; import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; +import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; @@ -194,4 +196,23 @@ public class ServerConnectionDelegate extends ServerDelegate { return ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(); } + + @Override public void sessionDetach(Connection conn, SessionDetach dtc) + { + // To ensure a clean detach, we unregister any remaining subscriptions. Unregister ensures + // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the unregister + // completes. + unregisterAllSubscriptions(conn, dtc); + super.sessionDetach(conn, dtc); + } + + private void unregisterAllSubscriptions(Connection conn, SessionDetach dtc) + { + final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel()); + final Collection<Subscription_0_10> subs = ssn.getSubscriptions(); + for (Subscription_0_10 subscription_0_10 : subs) + { + ssn.unregister(subscription_0_10); + } + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index eef6c047d3..82a6cdaa67 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -35,7 +35,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslServer; @@ -406,7 +405,7 @@ public class Connection extends ConnectionInvoker else { throw new ProtocolViolationException( - "Received frames for an already dettached session", null); + "Received frames for an already detached session", null); } } @@ -455,7 +454,7 @@ public class Connection extends ConnectionInvoker } } - protected Session getSession(int channel) + public Session getSession(int channel) { synchronized (lock) { |
