From f9f222d00f57c52ccdb526b4bd7e991a05755fb3 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Thu, 25 Aug 2011 10:46:39 +0000 Subject: QPID-3452: Broker now unregisters any remaining subscriptions on receipt of SessionDetach to prevent SubFlushRunner and QueueRunner sending erroneous frames causing a ProtocolViolationException on the client. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1161492 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/transport/ServerConnectionDelegate.java | 21 +++++++++++++++++++++ .../java/org/apache/qpid/transport/Connection.java | 5 ++--- 2 files changed, 23 insertions(+), 3 deletions(-) (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index f65cad23e7..b3acf48676 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.transport; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -36,6 +37,7 @@ import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.AuthenticationResult; import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; +import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; @@ -194,4 +196,23 @@ public class ServerConnectionDelegate extends ServerDelegate { return ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(); } + + @Override public void sessionDetach(Connection conn, SessionDetach dtc) + { + // To ensure a clean detach, we unregister any remaining subscriptions. Unregister ensures + // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the unregister + // completes. + unregisterAllSubscriptions(conn, dtc); + super.sessionDetach(conn, dtc); + } + + private void unregisterAllSubscriptions(Connection conn, SessionDetach dtc) + { + final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel()); + final Collection subs = ssn.getSubscriptions(); + for (Subscription_0_10 subscription_0_10 : subs) + { + ssn.unregister(subscription_0_10); + } + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index eef6c047d3..82a6cdaa67 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -35,7 +35,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslServer; @@ -406,7 +405,7 @@ public class Connection extends ConnectionInvoker else { throw new ProtocolViolationException( - "Received frames for an already dettached session", null); + "Received frames for an already detached session", null); } } @@ -455,7 +454,7 @@ public class Connection extends ConnectionInvoker } } - protected Session getSession(int channel) + public Session getSession(int channel) { synchronized (lock) { -- cgit v1.2.1