diff options
6 files changed, 76 insertions, 7 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index 381c376f56..c32fcfd73a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -1214,8 +1214,9 @@ public class QMFService implements ConfigStore.ConfigEventListener public BrokerSchema.ConnectionClass.CloseMethodResponseCommand close(final BrokerSchema.ConnectionClass.CloseMethodResponseCommandFactory factory) { - //todo - throw new UnsupportedOperationException(); + _obj.mgmtClose(); + + return factory.createResponseCommand(); } public UUID getId() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConnectionConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConnectionConfig.java index ad451f44a7..0dd36fe1fe 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConnectionConfig.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConnectionConfig.java @@ -44,4 +44,6 @@ public interface ConnectionConfig extends ConfiguredObject<ConnectionConfigType, ConfigStore getConfigStore(); Boolean isShadow(); + + void mgmtClose(); }
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java index ba305c96fa..fa2fb9ead1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java @@ -188,6 +188,11 @@ public class BrokerLink implements LinkConfig, ConnectionListener { return false; } + + public void mgmtClose() + { + _connectionConfig.mgmtClose(); + } } private class SessionFactory implements Connection.SessionFactory diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index e2735d151b..01ea7e5bb1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -46,6 +46,7 @@ import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.AMQPConnectionActor; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.ManagementActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; import org.apache.qpid.server.management.Managable; @@ -61,6 +62,7 @@ import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.Sender; import javax.management.JMException; +import javax.management.MBeanException; import javax.security.sasl.SaslServer; import java.io.IOException; import java.net.InetSocketAddress; @@ -703,6 +705,9 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol } closeAllChannels(); + + getConfigStore().removeConfiguredObject(this); + if (_managedObject != null) { _managedObject.unregister(); @@ -763,8 +768,6 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public void closeProtocolSession() { - getConfigStore().removeConfiguredObject(this); - _networkDriver.close(); try { @@ -1143,5 +1146,48 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol { return false; } + + public void mgmtClose() + { + MethodRegistry methodRegistry = getMethodRegistry(); + ConnectionCloseBody responseBody = + methodRegistry.createConnectionCloseBody( + AMQConstant.REPLY_SUCCESS.getCode(), + new AMQShortString("The connection was closed using the broker's management interface."), + 0,0); + + // This seems ugly but because we use closeConnection in both normal + // broker operation and as part of the management interface it cannot + // be avoided. The Current Actor will be null when this method is + // called via the QMF management interface. As such we need to set one. + boolean removeActor = false; + if (CurrentActor.get() == null) + { + removeActor = true; + CurrentActor.set(new ManagementActor(_actor.getRootMessageLogger())); + } + + try + { + writeFrame(responseBody.generateFrame(0)); + + try + { + + closeSession(); + } + catch (AMQException ex) + { + throw new RuntimeException(ex); + } + } + finally + { + if (removeActor) + { + CurrentActor.remove(); + } + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java index 56784e7251..f1e79839c9 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java @@ -188,4 +188,9 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine { return false; } + + public void mgmtClose() + { + _connection.mgmtClose(); + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 2ca5d28f42..1f559f690f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -532,13 +532,23 @@ public class Connection extends ConnectionInvoker public void close() { + close(ConnectionCloseCode.NORMAL, null); + } + + public void mgmtClose() + { + close(ConnectionCloseCode.CONNECTION_FORCED, "The connection was closed using the broker's management interface."); + } + + public void close(ConnectionCloseCode replyCode, String replyText, Option ... _options) + { synchronized (lock) { switch (state) { case OPEN: state = CLOSING; - connectionClose(ConnectionCloseCode.NORMAL, null); + connectionClose(replyCode, replyText, _options); Waiter w = new Waiter(lock, timeout); while (w.hasTime() && state == CLOSING && error == null) { @@ -547,14 +557,14 @@ public class Connection extends ConnectionInvoker if (error != null) { - close(); + close(replyCode, replyText, _options); throw new ConnectionException(error); } switch (state) { case CLOSING: - close(); + close(replyCode, replyText, _options); throw new ConnectionException("close() timed out"); case CLOSED: break; |
