diff options
| author | Keith Wall <kwall@apache.org> | 2012-03-19 14:00:37 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-03-19 14:00:37 +0000 |
| commit | e9a23033bb075f50b0a46c9366012e30538a4e54 (patch) | |
| tree | ca952b4b7109cea8ff7ca7896f708c525ec3410a /qpid/java/broker/src/main | |
| parent | 617810d8db1359d81688be587908cebe77d3f559 (diff) | |
| download | qpid-python-e9a23033bb075f50b0a46c9366012e30538a4e54.tar.gz | |
QPID-3895: Remove blocked channel/session from the list of blocked channels on channel/session close
This patch adds the fllowing:
- fixes AMQChannel to stop sending flow commands if channel is closing
- fixes AMQChannel#compareTo ServerSession#compareTo
- removes AMQSessionModel#getID() method from AMQChannel and Server session in order to avoid confusions
Applied patch from Oleksandr Rudyy <orudyy@gmail.com>.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1302455 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main')
4 files changed, 10 insertions, 17 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 588b2079f2..22f9544b0c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -1123,11 +1123,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm } - public Object getID() - { - return _channelId; - } - public AMQConnectionModel getConnectionModel() { return _session; @@ -1377,7 +1372,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm { if(_blockingQueues.remove(queue)) { - if(_blocking.compareAndSet(true,false)) + if(_blocking.compareAndSet(true,false) && !isClosing()) { _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); @@ -1627,6 +1622,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm public int compareTo(AMQSessionModel session) { - return getId().toString().compareTo(session.getID().toString()); + return getId().compareTo(session.getId()); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index f6980be525..1a055240b9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -1315,7 +1315,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException { - closeChannel((Integer)session.getID()); + int channelId = ((AMQChannel)session).getChannelId(); + closeChannel(channelId); MethodRegistry methodRegistry = getMethodRegistry(); ChannelCloseBody responseBody = @@ -1324,7 +1325,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr new AMQShortString(message), 0,0); - writeFrame(responseBody.generateFrame((Integer)session.getID())); + writeFrame(responseBody.generateFrame(channelId)); } public void close(AMQConstant cause, String message) throws AMQException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index a69f2a74ee..fa171815ca 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol; +import java.util.UUID; import java.util.concurrent.ConcurrentSkipListSet; import org.apache.qpid.AMQException; @@ -35,7 +36,8 @@ import org.apache.qpid.server.queue.SimpleAMQQueue; */ public interface AMQSessionModel extends Comparable<AMQSessionModel> { - public Object getID(); + /** Unique session ID across entire broker*/ + public UUID getId(); public AMQConnectionModel getConnectionModel(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 9f7b8c53b8..462e880e5f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -720,11 +720,6 @@ public class ServerSession extends Session close(); } - public Object getID() - { - return getName(); - } - public AMQConnectionModel getConnectionModel() { return getConnection(); @@ -854,7 +849,6 @@ public class ServerSession extends Session // unregister subscriptions in order to prevent sending of new messages // to subscriptions with closing session unregisterSubscriptions(); - super.close(); } @@ -1025,6 +1019,7 @@ public class ServerSession extends Session public int compareTo(AMQSessionModel session) { - return getId().toString().compareTo(session.getID().toString()); + return getId().compareTo(session.getId()); } + } |
