From a485a09a10e0ce6b7b8705cab9cd2322f80ad748 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Fri, 9 Apr 2010 14:16:26 +0000 Subject: QPID-2379: add Session.close() method implementation git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@932429 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/qmf/QMFService.java | 14 ++++++-- .../java/org/apache/qpid/server/AMQChannel.java | 26 ++++++++------ .../qpid/server/configuration/SessionConfig.java | 4 +++ .../qpid/server/protocol/AMQProtocolEngine.java | 41 ++++++++++++++++++++++ .../qpid/server/protocol/AMQProtocolSession.java | 1 + .../qpid/server/transport/ServerSession.java | 5 +++ 6 files changed, 78 insertions(+), 13 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index c32fcfd73a..4491f3f7d5 100644 --- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -21,6 +21,7 @@ package org.apache.qpid.qmf; +import org.apache.qpid.AMQException; import org.apache.qpid.qmf.schema.BrokerSchema; import org.apache.qpid.server.configuration.*; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -1336,8 +1337,17 @@ public class QMFService implements ConfigStore.ConfigEventListener public BrokerSchema.SessionClass.CloseMethodResponseCommand close(final BrokerSchema.SessionClass.CloseMethodResponseCommandFactory factory) { - //todo - throw new UnsupportedOperationException(); + try + { + _obj.mgmtClose(); + } + catch (AMQException e) + { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + return factory.createResponseCommand(); } public UUID getId() diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index fe5da20fa5..454b731e5f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -141,7 +141,7 @@ public class AMQChannel implements SessionConfig // Why do we need this reference ? - ritchiem private final AMQProtocolSession _session; - private boolean _closing; + private AtomicBoolean _closing = new AtomicBoolean(false); private final ConcurrentMap _blockingQueues = new ConcurrentHashMap(); @@ -480,7 +480,13 @@ public class AMQChannel implements SessionConfig */ public void close() throws AMQException { - setClosing(true); + if(!_closing.compareAndSet(false, true)) + { + //Channel is already closing + return; + } + + CurrentActor.get().message(_logSubject, ChannelMessages.CHN_CLOSE()); unsubscribeAllConsumers(); _transaction.rollback(); @@ -498,13 +504,6 @@ public class AMQChannel implements SessionConfig } - private void setClosing(boolean closing) - { - _closing = closing; - - CurrentActor.get().message(_logSubject, ChannelMessages.CHN_CLOSE()); - } - private void unsubscribeAllConsumers() throws AMQException { if (_logger.isInfoEnabled()) @@ -881,7 +880,7 @@ public class AMQChannel implements SessionConfig public boolean isSuspended() { - return _suspended.get() || _closing || _session.isClosing(); + return _suspended.get() || _closing.get() || _session.isClosing(); } public void commit() throws AMQException @@ -981,7 +980,7 @@ public class AMQChannel implements SessionConfig public boolean isClosing() { - return _closing; + return _closing.get(); } public AMQProtocolSession getProtocolSession() @@ -1377,4 +1376,9 @@ public class AMQChannel implements SessionConfig { return _createTime; } + + public void mgmtClose() throws AMQException + { + _session.mgmtCloseChannel(_channelId); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java index 5e5dd10e57..8fef642eff 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.configuration; +import org.apache.qpid.AMQException; + public interface SessionConfig extends ConfiguredObject { VirtualHostConfig getVirtualHost(); @@ -48,4 +50,6 @@ public interface SessionConfig extends ConfiguredObject