summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConnectionConfig.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java50
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java16
6 files changed, 76 insertions, 7 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
index 381c376f56..c32fcfd73a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
@@ -1214,8 +1214,9 @@ public class QMFService implements ConfigStore.ConfigEventListener
public BrokerSchema.ConnectionClass.CloseMethodResponseCommand close(final BrokerSchema.ConnectionClass.CloseMethodResponseCommandFactory factory)
{
- //todo
- throw new UnsupportedOperationException();
+ _obj.mgmtClose();
+
+ return factory.createResponseCommand();
}
public UUID getId()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConnectionConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConnectionConfig.java
index ad451f44a7..0dd36fe1fe 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConnectionConfig.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConnectionConfig.java
@@ -44,4 +44,6 @@ public interface ConnectionConfig extends ConfiguredObject<ConnectionConfigType,
ConfigStore getConfigStore();
Boolean isShadow();
+
+ void mgmtClose();
} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
index ba305c96fa..fa2fb9ead1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
@@ -188,6 +188,11 @@ public class BrokerLink implements LinkConfig, ConnectionListener
{
return false;
}
+
+ public void mgmtClose()
+ {
+ _connectionConfig.mgmtClose();
+ }
}
private class SessionFactory implements Connection.SessionFactory
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index e2735d151b..01ea7e5bb1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -46,6 +46,7 @@ import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.ManagementActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
import org.apache.qpid.server.management.Managable;
@@ -61,6 +62,7 @@ import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Sender;
import javax.management.JMException;
+import javax.management.MBeanException;
import javax.security.sasl.SaslServer;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -703,6 +705,9 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
}
closeAllChannels();
+
+ getConfigStore().removeConfiguredObject(this);
+
if (_managedObject != null)
{
_managedObject.unregister();
@@ -763,8 +768,6 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
public void closeProtocolSession()
{
- getConfigStore().removeConfiguredObject(this);
-
_networkDriver.close();
try
{
@@ -1143,5 +1146,48 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
{
return false;
}
+
+ public void mgmtClose()
+ {
+ MethodRegistry methodRegistry = getMethodRegistry();
+ ConnectionCloseBody responseBody =
+ methodRegistry.createConnectionCloseBody(
+ AMQConstant.REPLY_SUCCESS.getCode(),
+ new AMQShortString("The connection was closed using the broker's management interface."),
+ 0,0);
+
+ // This seems ugly but because we use closeConnection in both normal
+ // broker operation and as part of the management interface it cannot
+ // be avoided. The Current Actor will be null when this method is
+ // called via the QMF management interface. As such we need to set one.
+ boolean removeActor = false;
+ if (CurrentActor.get() == null)
+ {
+ removeActor = true;
+ CurrentActor.set(new ManagementActor(_actor.getRootMessageLogger()));
+ }
+
+ try
+ {
+ writeFrame(responseBody.generateFrame(0));
+
+ try
+ {
+
+ closeSession();
+ }
+ catch (AMQException ex)
+ {
+ throw new RuntimeException(ex);
+ }
+ }
+ finally
+ {
+ if (removeActor)
+ {
+ CurrentActor.remove();
+ }
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
index 56784e7251..f1e79839c9 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
@@ -188,4 +188,9 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine
{
return false;
}
+
+ public void mgmtClose()
+ {
+ _connection.mgmtClose();
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 2ca5d28f42..1f559f690f 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -532,13 +532,23 @@ public class Connection extends ConnectionInvoker
public void close()
{
+ close(ConnectionCloseCode.NORMAL, null);
+ }
+
+ public void mgmtClose()
+ {
+ close(ConnectionCloseCode.CONNECTION_FORCED, "The connection was closed using the broker's management interface.");
+ }
+
+ public void close(ConnectionCloseCode replyCode, String replyText, Option ... _options)
+ {
synchronized (lock)
{
switch (state)
{
case OPEN:
state = CLOSING;
- connectionClose(ConnectionCloseCode.NORMAL, null);
+ connectionClose(replyCode, replyText, _options);
Waiter w = new Waiter(lock, timeout);
while (w.hasTime() && state == CLOSING && error == null)
{
@@ -547,14 +557,14 @@ public class Connection extends ConnectionInvoker
if (error != null)
{
- close();
+ close(replyCode, replyText, _options);
throw new ConnectionException(error);
}
switch (state)
{
case CLOSING:
- close();
+ close(replyCode, replyText, _options);
throw new ConnectionException("close() timed out");
case CLOSED:
break;