diff options
Diffstat (limited to 'qpid/java/broker-plugins/jmx')
41 files changed, 9208 insertions, 0 deletions
diff --git a/qpid/java/broker-plugins/jmx/MANIFEST.MF b/qpid/java/broker-plugins/jmx/MANIFEST.MF new file mode 100644 index 0000000000..b13ff7f132 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/MANIFEST.MF @@ -0,0 +1,65 @@ +Manifest-Version: 1.0 +Bundle-ManifestVersion: 2 +Bundle-Name: Qpid Broker-Plugins JMX +Bundle-SymbolicName: broker-plugins-jmx +Bundle-Description: Management plugin for Qpid. +Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt +Bundle-DocURL: http://www.apache.org/ +Bundle-Version: 1.0.0 +Bundle-Activator: org.apache.qpid.server.jmx.JMXActivator +Bundle-RequiredExecutionEnvironment: JavaSE-1.6 +Bundle-ClassPath: . +Bundle-ActivationPolicy: lazy +Import-Package: org.apache.qpid, + org.apache.qpid.framing, + org.apache.qpid.protocol, + org.apache.qpid.common, + org.apache.qpid.management.common.mbeans, + org.apache.qpid.management.common.mbeans.annotations, + org.apache.qpid.server.security.auth, + org.apache.qpid.server.security.auth.manager, + org.apache.qpid.server.security.auth.rmi, + org.apache.qpid.server.security.auth.sasl, + org.apache.qpid.server.binding, + org.apache.qpid.server.exchange, + org.apache.qpid.server.logging, + org.apache.qpid.server.logging.actors, + org.apache.qpid.server.logging.messages, + org.apache.qpid.server.message, + org.apache.qpid.server.model, + org.apache.qpid.server.model.adapter, + org.apache.qpid.server.model.impl, + org.apache.qpid.server.configuration, + org.apache.qpid.server.configuration.plugins, + org.apache.qpid.server.connection, + org.apache.qpid.server.plugins, + org.apache.qpid.server.protocol, + org.apache.qpid.server.queue, + org.apache.qpid.server.registry, + org.apache.qpid.server.security, + org.apache.qpid.server.security.access, + org.apache.qpid.server.stats, + org.apache.qpid.server.virtualhost, + org.apache.qpid.util, + org.apache.commons.codec;version=1.3.0, + org.apache.commons.codec.binary;version=1.3.0, + org.apache.commons.configuration;version=1.0.0, + org.apache.commons.lang;version=1.0.0, + org.apache.commons.lang.builder;version=1.0.0, + org.apache.commons.lang.time;version=1.0.0, + org.apache.log4j;version=1.2.16, + org.codehaus.jackson;version=1.9.0, + org.codehaus.jackson.map;version=1.9.0, + javax.management.remote.rmi, + javax.management.remote, + javax.servlet, + javax.servlet.http, + javax.management;version=1.0.0, + javax.management.monitor;version=1.0.0, + javax.management.openmbean;version=1.0.0, + javax.security.auth.login;version=1.0.0, + javax.security.auth;version=1.0.0, + javax.rmi.ssl;version=1.0.0, + org.osgi.util.tracker;version=1.0.0, + org.osgi.framework;version=1.3 +Export-Package: org.apache.qpid.server.jmx;uses:="org.osgi.framework" diff --git a/qpid/java/broker-plugins/jmx/build.xml b/qpid/java/broker-plugins/jmx/build.xml new file mode 100644 index 0000000000..4deb0196e7 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/build.xml @@ -0,0 +1,43 @@ +<!-- + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + --> +<project name="Qpid Broker-Plugins JMX" default="build"> + + <condition property="systests.optional.depends" value="bdbstore" else=""> + <or> + <and> + <contains string="${modules.opt}" substring="bdbstore"/> + <contains string="${profile}" substring="bdb"/> + </and> + <and> + <istrue value="${optional}"/> + <contains string="${profile}" substring="bdb"/> + </and> + </or> + </condition> + + <property name="module.depends" value="common broker broker-plugins broker-plugins-jmx management/common" /> + <property name="module.test.depends" value="systests test broker/test common/test management/common client ${systests.optional.depends}" /> + + <property name="module.manifest" value="MANIFEST.MF" /> + <property name="module.plugin" value="true" /> + + <import file="../../module.xml" /> + + <target name="bundle" depends="bundle-tasks" /> +</project> diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/AMQManagedObject.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/AMQManagedObject.java new file mode 100644 index 0000000000..5c39a0c26a --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/AMQManagedObject.java @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.jmx; + +import java.util.concurrent.atomic.AtomicLong; + +import javax.management.ListenerNotFoundException; +import javax.management.NotCompliantMBeanException; +import javax.management.NotificationBroadcaster; +import javax.management.NotificationBroadcasterSupport; +import javax.management.NotificationFilter; +import javax.management.NotificationListener; + +/** + * This class provides additional feature of Notification Broadcaster to the + * DefaultManagedObject. + * @author Bhupendra Bhardwaj + * @version 0.1 + */ +public abstract class AMQManagedObject extends DefaultManagedObject + implements NotificationBroadcaster +{ + private final NotificationBroadcasterSupport _broadcaster = new NotificationBroadcasterSupport(); + + private AtomicLong _notificationSequenceNumber = new AtomicLong(); + + protected AMQManagedObject(Class<?> managementInterface, String typeName, ManagedObjectRegistry registry) + throws NotCompliantMBeanException + { + super(managementInterface, typeName, registry); + // CurrentActor will be defined as these objects are created during + // broker startup. + + } + + // notification broadcaster implementation + + public void addNotificationListener(NotificationListener listener, + NotificationFilter filter, + Object handback) + { + _broadcaster.addNotificationListener(listener, filter, handback); + } + + public void removeNotificationListener(NotificationListener listener) + throws ListenerNotFoundException + { + _broadcaster.removeNotificationListener(listener); + } + + + /** + * broadcaster support class + */ + protected NotificationBroadcasterSupport getBroadcaster() + { + return _broadcaster; + } + + protected long incrementAndGetSequenceNumber() + { + return _notificationSequenceNumber.incrementAndGet(); + } + + +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/DefaultManagedObject.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/DefaultManagedObject.java new file mode 100644 index 0000000000..4446f96802 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/DefaultManagedObject.java @@ -0,0 +1,189 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.jmx; + +import org.apache.log4j.Logger; + +import javax.management.JMException; +import javax.management.MBeanInfo; +import javax.management.MBeanNotificationInfo; +import javax.management.MalformedObjectNameException; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.StandardMBean; + +/** + * Provides implementation of the boilerplate ManagedObject interface. Most managed objects should find it useful + * to extend this class rather than implementing ManagedObject from scratch. + * + */ +public abstract class DefaultManagedObject extends StandardMBean implements ManagedObject +{ + private static final Logger LOGGER = Logger.getLogger(DefaultManagedObject.class); + + private final Class<?> _managementInterface; + + private final String _typeName; + + private final MBeanInfo _mbeanInfo; + + private ManagedObjectRegistry _registry; + + protected DefaultManagedObject(Class<?> managementInterface, String typeName, ManagedObjectRegistry registry) + throws NotCompliantMBeanException + { + super(managementInterface); + _registry = registry; + _managementInterface = managementInterface; + _typeName = typeName; + _mbeanInfo = buildMBeanInfo(); + } + + public ManagedObjectRegistry getRegistry() + { + return _registry; + } + + @Override + public MBeanInfo getMBeanInfo() + { + return _mbeanInfo; + } + + public String getType() + { + return _typeName; + } + + public Class<?> getManagementInterface() + { + return _managementInterface; + } + + public abstract ManagedObject getParentObject(); + + + public void register() throws JMException + { + _registry.registerObject(this); + } + + public void unregister() throws JMException + { + try + { + if(_registry != null) + { + _registry.unregisterObject(this); + } + } + finally + { + _registry = null; + } + } + + public String toString() + { + return getObjectInstanceName() + "[" + getType() + "]"; + } + + /** + * Created the ObjectName as per the JMX Specs + * @return ObjectName + * @throws javax.management.MalformedObjectNameException + */ + public ObjectName getObjectName() throws MalformedObjectNameException + { + String name = getObjectInstanceName(); + StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN); + + objectName.append(":type="); + objectName.append(getHierarchicalType(this)); + + objectName.append(","); + objectName.append(getHierarchicalName(this)); + objectName.append("name=").append(name); + + return new ObjectName(objectName.toString()); + } + + protected ObjectName getObjectNameForSingleInstanceMBean() throws MalformedObjectNameException + { + StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN); + + objectName.append(":type="); + objectName.append(getHierarchicalType(this)); + + String hierarchyName = getHierarchicalName(this); + if (hierarchyName != null) + { + objectName.append(","); + objectName.append(hierarchyName.substring(0, hierarchyName.lastIndexOf(","))); + } + + return new ObjectName(objectName.toString()); + } + + protected String getHierarchicalType(ManagedObject obj) + { + if (obj.getParentObject() != null) + { + String parentType = getHierarchicalType(obj.getParentObject()).toString(); + return parentType + "." + obj.getType(); + } + else + { + return obj.getType(); + } + } + + protected String getHierarchicalName(ManagedObject obj) + { + if (obj.getParentObject() != null) + { + String parentName = obj.getParentObject().getType() + "=" + + obj.getParentObject().getObjectInstanceName() + ","+ + getHierarchicalName(obj.getParentObject()); + + return parentName; + } + else + { + return ""; + } + } + + private MBeanInfo buildMBeanInfo() throws NotCompliantMBeanException + { + return new MBeanInfo(this.getClass().getName(), + MBeanIntrospector.getMBeanDescription(this.getClass()), + MBeanIntrospector.getMBeanAttributesInfo(getManagementInterface()), + MBeanIntrospector.getMBeanConstructorsInfo(this.getClass()), + MBeanIntrospector.getMBeanOperationsInfo(getManagementInterface()), + this.getNotificationInfo()); + } + + public MBeanNotificationInfo[] getNotificationInfo() + { + return null; + } +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/JMXActivator.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/JMXActivator.java new file mode 100644 index 0000000000..c588b40de7 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/JMXActivator.java @@ -0,0 +1,136 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.jmx; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceRegistration; + +public class JMXActivator implements BundleActivator +{ + private static final Logger LOGGER = Logger.getLogger(JMXActivator.class); + + private String _bundleName; + private JMXService _jmxService; + + private List<ServiceRegistration> _registeredServices; + + + public void start(final BundleContext ctx) throws Exception + { + boolean jmxManagementEnabled = ApplicationRegistry.getInstance().getConfiguration().getJMXManagementEnabled(); + + if (jmxManagementEnabled) + { + _jmxService = new JMXService(); + startJmsService(_jmxService); + + _bundleName = ctx.getBundle().getSymbolicName(); + + _registeredServices = registerServices(ctx); + } + else + { + LOGGER.debug("Skipping registration of JMX plugin as JMX Management disabled in config. "); + } + } + + public void stop(final BundleContext bundleContext) throws Exception + { + try + { + if (_jmxService != null) + { + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Stopping jmx plugin: " + _bundleName); + } + _jmxService.close(); + } + + if (_registeredServices != null) + { + unregisterServices(); + } + } + finally + { + _jmxService = null; + _registeredServices = null; + } + } + + + private List<ServiceRegistration> registerServices(BundleContext ctx) + { + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Registering jmx plugin: " + _bundleName); + } + + List<ServiceRegistration> serviceRegistrations = new ArrayList<ServiceRegistration>(); + + ServiceRegistration jmxServiceRegistration = ctx.registerService(JMXService.class.getName(), _jmxService, null); + ServiceRegistration jmxConfigFactoryRegistration = ctx.registerService(ConfigurationPluginFactory.class.getName(), JMXConfiguration.FACTORY, null); + + serviceRegistrations.add(jmxServiceRegistration); + serviceRegistrations.add(jmxConfigFactoryRegistration); + return serviceRegistrations; + } + + private void startJmsService(JMXService jmxService) throws Exception + { + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Starting JMX service"); + } + boolean startedSuccessfully = false; + try + { + jmxService.start(); + startedSuccessfully = true; + } + finally + { + if (!startedSuccessfully) + { + LOGGER.error("JMX failed to start normally, closing service"); + jmxService.close(); + } + } + } + + private void unregisterServices() + { + for (Iterator<ServiceRegistration> iterator = _registeredServices.iterator(); iterator.hasNext();) + { + ServiceRegistration service = iterator.next(); + service.unregister(); + } + } +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/JMXConfiguration.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/JMXConfiguration.java new file mode 100644 index 0000000000..dc9a712f90 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/JMXConfiguration.java @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.jmx; + +import org.apache.commons.configuration.CompositeConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; + +import java.util.Arrays; +import java.util.List; + +public class JMXConfiguration extends ConfigurationPlugin +{ + CompositeConfiguration _finalConfig; + + public static final ConfigurationPluginFactory FACTORY = new ConfigurationPluginFactory() + { + public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException + { + ConfigurationPlugin instance = new JMXConfiguration(); + instance.setConfiguration(path, config); + return instance; + } + + public List<String> getParentPaths() + { + return Arrays.asList("jmx"); + } + }; + + public String[] getElementsProcessed() + { + return new String[] { "" }; + } + + public Configuration getConfiguration() + { + return _finalConfig; + } + + + @Override + public void validateConfiguration() throws ConfigurationException + { + // Valid Configuration either has xml links to new files + _finalConfig = new CompositeConfiguration(getConfig()); + List subFiles = getConfig().getList("xml[@fileName]"); + for (Object subFile : subFiles) + { + _finalConfig.addConfiguration(new XMLConfiguration((String) subFile)); + } + + } + +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java new file mode 100644 index 0000000000..0648235077 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java @@ -0,0 +1,499 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.jmx; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ManagementConsoleMessages; + +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; + +import org.apache.qpid.server.security.auth.rmi.RMIPasswordAuthenticator; +import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; + +import javax.management.JMException; +import javax.management.MBeanServer; +import javax.management.MBeanServerFactory; +import javax.management.Notification; +import javax.management.NotificationFilterSupport; +import javax.management.NotificationListener; +import javax.management.ObjectName; +import javax.management.remote.JMXConnectionNotification; +import javax.management.remote.JMXConnectorServer; +import javax.management.remote.JMXServiceURL; +import javax.management.remote.MBeanServerForwarder; +import javax.management.remote.rmi.RMIConnection; +import javax.management.remote.rmi.RMIConnectorServer; +import javax.management.remote.rmi.RMIJRMPServerImpl; +import javax.management.remote.rmi.RMIServerImpl; +import javax.rmi.ssl.SslRMIClientSocketFactory; +import javax.rmi.ssl.SslRMIServerSocketFactory; +import javax.security.auth.Subject; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.reflect.Proxy; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.UnknownHostException; +import java.rmi.AlreadyBoundException; +import java.rmi.NoSuchObjectException; +import java.rmi.NotBoundException; +import java.rmi.registry.LocateRegistry; +import java.rmi.registry.Registry; +import java.rmi.server.RMIClientSocketFactory; +import java.rmi.server.RMIServerSocketFactory; +import java.rmi.server.UnicastRemoteObject; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class starts up an MBeanserver. If out of the box agent has been enabled then there are no + * security features implemented like user authentication and authorisation. + */ +public class JMXManagedObjectRegistry implements ManagedObjectRegistry +{ + private static final Logger _log = Logger.getLogger(JMXManagedObjectRegistry.class); + + private final MBeanServer _mbeanServer; + private JMXConnectorServer _cs; + private Registry _rmiRegistry; + private boolean _useCustomSocketFactory; + + private final int _jmxPortRegistryServer; + private final int _jmxPortConnectorServer; + + public JMXManagedObjectRegistry() throws AMQException + { + _log.info("Initialising managed object registry using platform MBean server"); + IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); + + // Retrieve the config parameters + _useCustomSocketFactory = appRegistry.getConfiguration().getUseCustomRMISocketFactory(); + boolean platformServer = appRegistry.getConfiguration().getPlatformMbeanserver(); + + _mbeanServer = + platformServer ? ManagementFactory.getPlatformMBeanServer() + : MBeanServerFactory.createMBeanServer(ManagedObject.DOMAIN); + + _jmxPortRegistryServer = appRegistry.getConfiguration().getJMXPortRegistryServer(); + _jmxPortConnectorServer = appRegistry.getConfiguration().getJMXConnectorServerPort(); + + } + + public void start() throws IOException, ConfigurationException + { + + CurrentActor.get().message(ManagementConsoleMessages.STARTUP()); + + //check if system properties are set to use the JVM's out-of-the-box JMXAgent + if (areOutOfTheBoxJMXOptionsSet()) + { + CurrentActor.get().message(ManagementConsoleMessages.READY(true)); + return; + } + + IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); + + + //Socket factories for the RMIConnectorServer, either default or SLL depending on configuration + RMIClientSocketFactory csf; + RMIServerSocketFactory ssf; + + //check ssl enabled option in config, default to true if option is not set + boolean sslEnabled = appRegistry.getConfiguration().getManagementSSLEnabled(); + + if (sslEnabled) + { + //set the SSL related system properties used by the SSL RMI socket factories to the values + //given in the configuration file, unless command line settings have already been specified + String keyStorePath; + + if(System.getProperty("javax.net.ssl.keyStore") != null) + { + keyStorePath = System.getProperty("javax.net.ssl.keyStore"); + } + else + { + keyStorePath = appRegistry.getConfiguration().getManagementKeyStorePath(); + } + + //check the keystore path value is valid + if (keyStorePath == null) + { + throw new ConfigurationException("JMX management SSL keystore path not defined, " + + "unable to start SSL protected JMX ConnectorServer"); + } + else + { + //ensure the system property is set + System.setProperty("javax.net.ssl.keyStore", keyStorePath); + + //check the file is usable + File ksf = new File(keyStorePath); + + if (!ksf.exists()) + { + throw new FileNotFoundException("Cannot find JMX management SSL keystore file: " + ksf); + } + if (!ksf.canRead()) + { + throw new FileNotFoundException("Cannot read JMX management SSL keystore file: " + + ksf + ". Check permissions."); + } + + CurrentActor.get().message(ManagementConsoleMessages.SSL_KEYSTORE(ksf.getAbsolutePath())); + } + + //check the key store password is set + if (System.getProperty("javax.net.ssl.keyStorePassword") == null) + { + + if (appRegistry.getConfiguration().getManagementKeyStorePassword() == null) + { + throw new ConfigurationException("JMX management SSL keystore password not defined, " + + "unable to start requested SSL protected JMX server"); + } + else + { + System.setProperty("javax.net.ssl.keyStorePassword", + appRegistry.getConfiguration().getManagementKeyStorePassword()); + } + } + + //create the SSL RMI socket factories + csf = new SslRMIClientSocketFactory(); + ssf = new SslRMIServerSocketFactory(); + } + else + { + //Do not specify any specific RMI socket factories, resulting in use of the defaults. + csf = null; + ssf = null; + } + + //add a JMXAuthenticator implementation the env map to authenticate the RMI based JMX connector server + RMIPasswordAuthenticator rmipa = new RMIPasswordAuthenticator(new InetSocketAddress(_jmxPortRegistryServer)); + HashMap<String,Object> env = new HashMap<String,Object>(); + env.put(JMXConnectorServer.AUTHENTICATOR, rmipa); + + /* + * Start a RMI registry on the management port, to hold the JMX RMI ConnectorServer stub. + * Using custom socket factory to prevent anyone (including us unfortunately) binding to the registry using RMI. + * As a result, only binds made using the object reference will succeed, thus securing it from external change. + */ + System.setProperty("java.rmi.server.randomIDs", "true"); + if(_useCustomSocketFactory) + { + _rmiRegistry = LocateRegistry.createRegistry(_jmxPortRegistryServer, null, new CustomRMIServerSocketFactory()); + } + else + { + _rmiRegistry = LocateRegistry.createRegistry(_jmxPortRegistryServer, null, null); + } + + CurrentActor.get().message(ManagementConsoleMessages.LISTENING("RMI Registry", _jmxPortRegistryServer)); + + /* + * We must now create the RMI ConnectorServer manually, as the JMX Factory methods use RMI calls + * to bind the ConnectorServer to the registry, which will now fail as for security we have + * locked it from any RMI based modifications, including our own. Instead, we will manually bind + * the RMIConnectorServer stub to the registry using its object reference, which will still succeed. + * + * The registry is exported on the defined management port 'port'. We will export the RMIConnectorServer + * on 'port +1'. Use of these two well-defined ports will ease any navigation through firewall's. + */ + final Map<String, String> connectionIdUsernameMap = new ConcurrentHashMap<String, String>(); + final RMIServerImpl rmiConnectorServerStub = new RMIJRMPServerImpl(_jmxPortConnectorServer, csf, ssf, env) + { + + /** + * Override makeClient so we can cache the username of the client in a Map keyed by connectionId. + * ConnectionId is guaranteed to be unique per client connection, according to the JMS spec. + * An instance of NotificationListener (mapCleanupListener) will be responsible for removing these Map + * entries. + * + * @see javax.management.remote.rmi.RMIJRMPServerImpl#makeClient(String, javax.security.auth.Subject) + */ + @Override + protected RMIConnection makeClient(String connectionId, Subject subject) throws IOException + { + final RMIConnection makeClient = super.makeClient(connectionId, subject); + final UsernamePrincipal usernamePrincipalFromSubject = UsernamePrincipal.getUsernamePrincipalFromSubject(subject); + connectionIdUsernameMap.put(connectionId, usernamePrincipalFromSubject.getName()); + return makeClient; + } + }; + + // Create a Listener responsible for removing the map entries add by the #makeClient entry above. + final NotificationListener mapCleanupListener = new NotificationListener() + { + + public void handleNotification(Notification notification, Object handback) + { + final String connectionId = ((JMXConnectionNotification) notification).getConnectionId(); + connectionIdUsernameMap.remove(connectionId); + } + }; + + String localHost; + try + { + localHost = InetAddress.getLocalHost().getHostName(); + } + catch(UnknownHostException ex) + { + localHost="127.0.0.1"; + } + final String hostname = localHost; + final JMXServiceURL externalUrl = new JMXServiceURL( + "service:jmx:rmi://"+hostname+":"+(_jmxPortConnectorServer)+"/jndi/rmi://"+hostname+":"+_jmxPortRegistryServer+"/jmxrmi"); + + final JMXServiceURL internalUrl = new JMXServiceURL("rmi", hostname, _jmxPortConnectorServer); + _cs = new RMIConnectorServer(internalUrl, env, rmiConnectorServerStub, _mbeanServer) + { + @Override + public synchronized void start() throws IOException + { + try + { + //manually bind the connector server to the registry at key 'jmxrmi', like the out-of-the-box agent + _rmiRegistry.bind("jmxrmi", rmiConnectorServerStub); + } + catch (AlreadyBoundException abe) + { + //key was already in use. shouldnt happen here as its a new registry, unbindable by normal means. + + //IOExceptions are the only checked type throwable by the method, wrap and rethrow + IOException ioe = new IOException(abe.getMessage()); + ioe.initCause(abe); + throw ioe; + } + + //now do the normal tasks + super.start(); + } + + @Override + public synchronized void stop() throws IOException + { + try + { + if (_rmiRegistry != null) + { + _rmiRegistry.unbind("jmxrmi"); + } + } + catch (NotBoundException nbe) + { + // TODO consider if we want to keep new logging + _log.error("Failed to unbind jmxrmi", nbe); + //ignore + } + + //now do the normal tasks + super.stop(); + } + + @Override + public JMXServiceURL getAddress() + { + //must return our pre-crafted url that includes the full details, inc JNDI details + return externalUrl; + } + + }; + + + //Add the custom invoker as an MBeanServerForwarder, and start the RMIConnectorServer. + MBeanServerForwarder mbsf = MBeanInvocationHandlerImpl.newProxyInstance(); + _cs.setMBeanServerForwarder(mbsf); + + + // Get the handler that is used by the above MBInvocationHandler Proxy. + // which is the MBeanInvocationHandlerImpl and so also a NotificationListener. + final NotificationListener invocationHandler = (NotificationListener) Proxy.getInvocationHandler(mbsf); + + // Install a notification listener on OPENED, CLOSED, and FAILED, + // passing the map of connection-ids to usernames as hand-back data. + final NotificationFilterSupport invocationHandlerFilter = new NotificationFilterSupport(); + invocationHandlerFilter.enableType(JMXConnectionNotification.OPENED); + invocationHandlerFilter.enableType(JMXConnectionNotification.CLOSED); + invocationHandlerFilter.enableType(JMXConnectionNotification.FAILED); + _cs.addNotificationListener(invocationHandler, invocationHandlerFilter, connectionIdUsernameMap); + + // Install a second notification listener on CLOSED AND FAILED only to remove the entry from the + // Map. Here we rely on the fact that JMX will call the listeners in the order in which they are + // installed. + final NotificationFilterSupport mapCleanupHandlerFilter = new NotificationFilterSupport(); + mapCleanupHandlerFilter.enableType(JMXConnectionNotification.CLOSED); + mapCleanupHandlerFilter.enableType(JMXConnectionNotification.FAILED); + _cs.addNotificationListener(mapCleanupListener, mapCleanupHandlerFilter, null); + + _cs.start(); + + String connectorServer = (sslEnabled ? "SSL " : "") + "JMX RMIConnectorServer"; + CurrentActor.get().message(ManagementConsoleMessages.LISTENING(connectorServer, _jmxPortConnectorServer)); + + CurrentActor.get().message(ManagementConsoleMessages.READY(false)); + } + + /* + * Custom RMIServerSocketFactory class, used to prevent updates to the RMI registry. + * Supplied to the registry at creation, this will prevent RMI-based operations on the + * registry such as attempting to bind a new object, thereby securing it from tampering. + * This is accomplished by always returning null when attempting to determine the address + * of the caller, thus ensuring the registry will refuse the attempt. Calls to bind etc + * made using the object reference will not be affected and continue to operate normally. + */ + + private static class CustomRMIServerSocketFactory implements RMIServerSocketFactory + { + + public ServerSocket createServerSocket(int port) throws IOException + { + return new NoLocalAddressServerSocket(port); + } + + private static class NoLocalAddressServerSocket extends ServerSocket + { + NoLocalAddressServerSocket(int port) throws IOException + { + super(port); + } + + @Override + public Socket accept() throws IOException + { + Socket s = new NoLocalAddressSocket(); + super.implAccept(s); + return s; + } + } + + private static class NoLocalAddressSocket extends Socket + { + @Override + public InetAddress getInetAddress() + { + return null; + } + } + } + + + public void registerObject(ManagedObject managedObject) throws JMException + { + _mbeanServer.registerMBean(managedObject, managedObject.getObjectName()); + } + + public void unregisterObject(ManagedObject managedObject) throws JMException + { + _mbeanServer.unregisterMBean(managedObject.getObjectName()); + } + + // checks if the system properties are set which enable the JVM's out-of-the-box JMXAgent. + private boolean areOutOfTheBoxJMXOptionsSet() + { + if (System.getProperty("com.sun.management.jmxremote") != null) + { + return true; + } + + if (System.getProperty("com.sun.management.jmxremote.port") != null) + { + return true; + } + + return false; + } + + //Stops the JMXConnectorServer and RMIRegistry, then unregisters any remaining MBeans from the MBeanServer + public void close() + { + _log.debug("close() called"); + + if (_cs != null) + { + // Stopping the JMX ConnectorServer + try + { + CurrentActor.get().message(ManagementConsoleMessages.SHUTTING_DOWN("JMX RMIConnectorServer", _cs.getAddress().getPort())); + _cs.stop(); + } + catch (IOException e) + { + _log.error("Exception while closing the JMX ConnectorServer: ", e); + } + } + + if (_rmiRegistry != null) + { + // Stopping the RMI registry + CurrentActor.get().message(ManagementConsoleMessages.SHUTTING_DOWN("RMI Registry", _jmxPortRegistryServer)); + try + { + boolean success = UnicastRemoteObject.unexportObject(_rmiRegistry, false); + if (!success) + { + _log.warn("Failed to unexport object " + _rmiRegistry); + } + } + catch (NoSuchObjectException e) + { + _log.error("Exception while closing the RMI Registry: ", e); + } + } + + //ObjectName query to gather all Qpid related MBeans + ObjectName mbeanNameQuery = null; + try + { + mbeanNameQuery = new ObjectName(ManagedObject.DOMAIN + ":*"); + } + catch (Exception e1) + { + _log.warn("Unable to generate MBean ObjectName query for close operation"); + } + + for (ObjectName name : _mbeanServer.queryNames(mbeanNameQuery, null)) + { + try + { + _mbeanServer.unregisterMBean(name); + } + catch (JMException e) + { + _log.error("Exception unregistering MBean '"+ name +"': " + e.getMessage()); + } + } + + CurrentActor.get().message(ManagementConsoleMessages.STOPPED()); + } + +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/JMXService.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/JMXService.java new file mode 100644 index 0000000000..7519cea4db --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/JMXService.java @@ -0,0 +1,189 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.jmx; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.ServiceLoader; + +import javax.management.JMException; +import javax.management.StandardMBean; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.jmx.mbeans.UserManagementMBean; +import org.apache.qpid.server.jmx.mbeans.ConfigurationManagementMBean; +import org.apache.qpid.server.jmx.mbeans.ServerInformationMBean; +import org.apache.qpid.server.jmx.mbeans.Shutdown; +import org.apache.qpid.server.jmx.mbeans.VirtualHostMBean; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfigurationChangeListener; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.PasswordCredentialManagingAuthenticationProvider; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.registry.ApplicationRegistry; + + +public class JMXService implements ConfigurationChangeListener +{ + private static final ClassLoader BUNDLE_CLASSLOADER = JMXService.class.getClassLoader(); + + private static final Logger LOGGER = Logger.getLogger(JMXService.class); + + private final Broker _broker; + private final JMXManagedObjectRegistry _objectRegistry; + private final Shutdown _shutdown; + private final ServerInformationMBean _serverInfo; + private final ConfigurationManagementMBean _configManagement; + + private final Map<ConfiguredObject, AMQManagedObject> _children = new HashMap<ConfiguredObject, AMQManagedObject>(); + + public JMXService() throws AMQException, JMException + { + _broker = ApplicationRegistry.getInstance().getBroker(); + _objectRegistry = new JMXManagedObjectRegistry(); + + _broker.addChangeListener(this); + synchronized (_children) + { + for(VirtualHost virtualHost : _broker.getVirtualHosts()) + { + if(!_children.containsKey(virtualHost)) + { + _children.put(virtualHost, new VirtualHostMBean(virtualHost, _objectRegistry)); + } + } + } + _shutdown = new Shutdown(_objectRegistry); + _serverInfo = new ServerInformationMBean(_objectRegistry, _broker); + _configManagement = new ConfigurationManagementMBean(_objectRegistry); + } + + public void start() throws IOException, ConfigurationException + { + _objectRegistry.start(); + } + + public void close() + { + _broker.removeChangeListener(this); + + _objectRegistry.close(); + } + + public void stateChanged(ConfiguredObject object, State oldState, State newState) + { + + } + + public void childAdded(ConfiguredObject object, ConfiguredObject child) + { + synchronized (_children) + { + try + { + AMQManagedObject mbean; + if(child instanceof VirtualHost) + { + VirtualHost vhostChild = (VirtualHost)child; + mbean = new VirtualHostMBean(vhostChild, _objectRegistry); + } + else if(child instanceof PasswordCredentialManagingAuthenticationProvider) + { + mbean = new UserManagementMBean((PasswordCredentialManagingAuthenticationProvider) child, _objectRegistry); + } + else + { + mbean = null; + } + + if (mbean != null) + { + createAdditionalMBeansFromProviders(child, mbean); + } + } + catch(JMException e) + { + LOGGER.error("Error creating mbean", e); + // TODO - Implement error reporting on mbean creation + } + } + } + + + public void childRemoved(ConfiguredObject object, ConfiguredObject child) + { + // TODO - implement vhost removal (possibly just removing the instanceof check below) + + synchronized (_children) + { + if(child instanceof PasswordCredentialManagingAuthenticationProvider) + { + AMQManagedObject mbean = _children.remove(child); + if(mbean != null) + { + try + { + mbean.unregister(); + } + catch(JMException e) + { + LOGGER.error("Error creating mbean", e); + //TODO - report error on removing child MBean + } + } + } + + } + } + + private void createAdditionalMBeansFromProviders(ConfiguredObject child, AMQManagedObject mbean) throws JMException + { + _children.put(child, mbean); + + for (Iterator<MBeanProvider> iterator = getMBeanProviderIterator(); iterator.hasNext();) + { + MBeanProvider provider = iterator.next(); + LOGGER.debug("Consulting mbean provider : " + provider + " for child : " + child); + if (provider.isChildManageableByMBean(child)) + { + LOGGER.debug("Provider will create mbean "); + StandardMBean bean = provider.createMBean(child, mbean); + // TODO track the mbeans that have been created on behalf of a child in a map, then + // if the child is ever removed, destroy these beans too. + } + } + } + + /** + * Finds all classes implementing the {@link MBeanProvider} interface. This will find + * <b>only</b> those classes which are visible to the classloader of this OSGI bundle. + */ + private Iterator<MBeanProvider> getMBeanProviderIterator() + { + return ServiceLoader.load(MBeanProvider.class, BUNDLE_CLASSLOADER).iterator(); + } +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/MBeanIntrospector.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/MBeanIntrospector.java new file mode 100644 index 0000000000..79ddc8cfc0 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/MBeanIntrospector.java @@ -0,0 +1,400 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.jmx; + +import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute; +import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; +import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; +import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation; +import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParameter; + +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanConstructorInfo; +import javax.management.MBeanOperationInfo; +import javax.management.MBeanParameterInfo; +import javax.management.NotCompliantMBeanException; +import java.lang.annotation.Annotation; +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +/** + * This class is a utility class to introspect the MBean class and the management + * interface class for various purposes. + * @author Bhupendra Bhardwaj + * @version 0.1 + */ +class MBeanIntrospector +{ + + private static final String _defaultAttributeDescription = "Management attribute"; + private static final String _defaultOerationDescription = "Management operation"; + private static final String _defaultConstructorDescription = "MBean constructor"; + private static final String _defaultMbeanDescription = "Management interface of the MBean"; + + private MBeanIntrospector() + { + } + + /** + * Introspects the management interface class for MBean attributes. + * @param interfaceClass + * @return MBeanAttributeInfo[] + * @throws javax.management.NotCompliantMBeanException + */ + static MBeanAttributeInfo[] getMBeanAttributesInfo(Class interfaceClass) + throws NotCompliantMBeanException + { + List<MBeanAttributeInfo> attributesList = new ArrayList<MBeanAttributeInfo>(); + + /** + * Using reflection, all methods of the managemetn interface will be analysed, + * and MBeanInfo will be created. + */ + for (Method method : interfaceClass.getMethods()) + { + String name = method.getName(); + Class<?> resultType = method.getReturnType(); + MBeanAttributeInfo attributeInfo = null; + + if (isAttributeGetterMethod(method)) + { + String desc = getAttributeDescription(method); + attributeInfo = new MBeanAttributeInfo(name.substring(3), + resultType.getName(), + desc, + true, + false, + false); + int index = getIndexIfAlreadyExists(attributeInfo, attributesList); + if (index == -1) + { + attributesList.add(attributeInfo); + } + else + { + attributeInfo = new MBeanAttributeInfo(name.substring(3), + resultType.getName(), + desc, + true, + true, + false); + attributesList.set(index, attributeInfo); + } + } + else if (isAttributeSetterMethod(method)) + { + String desc = getAttributeDescription(method); + attributeInfo = new MBeanAttributeInfo(name.substring(3), + method.getParameterTypes()[0].getName(), + desc, + false, + true, + false); + int index = getIndexIfAlreadyExists(attributeInfo, attributesList); + if (index == -1) + { + attributesList.add(attributeInfo); + } + else + { + attributeInfo = new MBeanAttributeInfo(name.substring(3), + method.getParameterTypes()[0].getName(), + desc, + true, + true, + false); + attributesList.set(index, attributeInfo); + } + } + else if (isAttributeBoolean(method)) + { + attributeInfo = new MBeanAttributeInfo(name.substring(2), + resultType.getName(), + getAttributeDescription(method), + true, + false, + true); + attributesList.add(attributeInfo); + } + } + + return attributesList.toArray(new MBeanAttributeInfo[0]); + } + + /** + * Introspects the management interface class for management operations. + * @param interfaceClass + * @return MBeanOperationInfo[] + */ + static MBeanOperationInfo[] getMBeanOperationsInfo(Class interfaceClass) + { + List<MBeanOperationInfo> operationsList = new ArrayList<MBeanOperationInfo>(); + + for (Method method : interfaceClass.getMethods()) + { + if (!isAttributeGetterMethod(method) && + !isAttributeSetterMethod(method) && + !isAttributeBoolean(method)) + { + operationsList.add(getOperationInfo(method)); + } + } + + return operationsList.toArray(new MBeanOperationInfo[0]); + } + + /** + * Checks if the method is an attribute getter method. + * @param method + * @return true if the method is an attribute getter method. + */ + private static boolean isAttributeGetterMethod(Method method) + { + if (!(method.getName().equals("get")) && + method.getName().startsWith("get") && + method.getParameterTypes().length == 0 && + !method.getReturnType().equals(void.class)) + { + return true; + } + + return false; + } + + /** + * Checks if the method is an attribute setter method. + * @param method + * @return true if the method is an attribute setter method. + */ + private static boolean isAttributeSetterMethod(Method method) + { + if (!(method.getName().equals("set")) && + method.getName().startsWith("set") && + method.getParameterTypes().length == 1 && + method.getReturnType().equals(void.class)) + { + return true; + } + + return false; + } + + /** + * Checks if the attribute is a boolean and the method is a isX kind og method. + * @param method + * @return true if the method is an attribute isX type of method + */ + private static boolean isAttributeBoolean(Method method) + { + if (!(method.getName().equals("is")) && + method.getName().startsWith("is") && + method.getParameterTypes().length == 0 && + method.getReturnType().equals(boolean.class)) + { + return true; + } + + return false; + } + + /** + * Helper method to retrieve the attribute index from the list of attributes. + * @param attribute + * @param list + * @return attribute index no. -1 if attribtue doesn't exist + * @throws javax.management.NotCompliantMBeanException + */ + private static int getIndexIfAlreadyExists(MBeanAttributeInfo attribute, + List<MBeanAttributeInfo> list) + throws NotCompliantMBeanException + { + String exceptionMsg = "Conflicting attribute methods for attribute " + attribute.getName(); + + for (MBeanAttributeInfo memberAttribute : list) + { + if (attribute.getName().equals(memberAttribute.getName())) + { + if (!attribute.getType().equals(memberAttribute.getType())) + { + throw new NotCompliantMBeanException(exceptionMsg); + } + if (attribute.isReadable() && memberAttribute.isReadable()) + { + if (attribute.isIs() != memberAttribute.isIs()) + { + throw new NotCompliantMBeanException(exceptionMsg); + } + } + + return list.indexOf(memberAttribute); + } + } + + return -1; + } + + /** + * Retrieves the attribute description from annotation + * @param attributeMethod + * @return attribute description + */ + private static String getAttributeDescription(Method attributeMethod) + { + MBeanAttribute anno = attributeMethod.getAnnotation(MBeanAttribute.class); + if (anno != null) + { + return anno.description(); + } + return _defaultAttributeDescription; + } + + /** + * Introspects the method to retrieve the operation information. + * @param operation + * @return MBeanOperationInfo + */ + private static MBeanOperationInfo getOperationInfo(Method operation) + { + MBeanOperationInfo operationInfo = null; + Class<?> returnType = operation.getReturnType(); + + MBeanParameterInfo[] paramsInfo = getParametersInfo(operation.getParameterAnnotations(), + operation.getParameterTypes()); + + String operationDesc = _defaultOerationDescription; + int impact = MBeanOperationInfo.UNKNOWN; + + if (operation.getAnnotation(MBeanOperation.class) != null) + { + operationDesc = operation.getAnnotation(MBeanOperation.class).description(); + impact = operation.getAnnotation(MBeanOperation.class).impact(); + } + operationInfo = new MBeanOperationInfo(operation.getName(), + operationDesc, + paramsInfo, + returnType.getName(), + impact); + + return operationInfo; + } + + /** + * Constructs the parameter info. + * @param paramsAnno + * @param paramTypes + * @return MBeanParameterInfo[] + */ + private static MBeanParameterInfo[] getParametersInfo(Annotation[][] paramsAnno, + Class<?>[] paramTypes) + { + int noOfParams = paramsAnno.length; + + MBeanParameterInfo[] paramsInfo = new MBeanParameterInfo[noOfParams]; + + for (int i = 0; i < noOfParams; i++) + { + MBeanParameterInfo paramInfo = null; + String type = paramTypes[i].getName(); + for (Annotation anno : paramsAnno[i]) + { + String name,desc; + if (MBeanOperationParameter.class.isInstance(anno)) + { + name = MBeanOperationParameter.class.cast(anno).name(); + desc = MBeanOperationParameter.class.cast(anno).description(); + paramInfo = new MBeanParameterInfo(name, type, desc); + } + } + + + if (paramInfo == null) + { + paramInfo = new MBeanParameterInfo("p " + (i + 1), type, "parameter " + (i + 1)); + } + if (paramInfo != null) + { + paramsInfo[i] = paramInfo; + } + } + + return paramsInfo; + } + + /** + * Introspects the MBean class for constructors + * @param implClass + * @return MBeanConstructorInfo[] + */ + static MBeanConstructorInfo[] getMBeanConstructorsInfo(Class implClass) + { + List<MBeanConstructorInfo> constructors = new ArrayList<MBeanConstructorInfo>(); + + for (Constructor cons : implClass.getConstructors()) + { + MBeanConstructorInfo constructorInfo = getMBeanConstructorInfo(cons); + if (constructorInfo != null) + { + constructors.add(constructorInfo); + } + } + + return constructors.toArray(new MBeanConstructorInfo[0]); + } + + /** + * Retrieves the constructor info from given constructor. + * @param cons + * @return MBeanConstructorInfo + */ + private static MBeanConstructorInfo getMBeanConstructorInfo(Constructor cons) + { + String desc = _defaultConstructorDescription; + Annotation anno = cons.getAnnotation(MBeanConstructor.class); + if (anno != null && MBeanConstructor.class.isInstance(anno)) + { + desc = MBeanConstructor.class.cast(anno).value(); + if(desc == null) + { + desc = _defaultConstructorDescription; + } + } + + return new MBeanConstructorInfo(cons.getName(), desc, null); + } + + /** + * Retrieves the description from the annotations of given class + * @param annotatedClass + * @return class description + */ + static String getMBeanDescription(Class annotatedClass) + { + Annotation anno = annotatedClass.getAnnotation(MBeanDescription.class); + if (anno != null && MBeanDescription.class.isInstance(anno)) + { + return MBeanDescription.class.cast(anno).value(); + } + return _defaultMbeanDescription; + } + +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java new file mode 100644 index 0000000000..49f06d5121 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java @@ -0,0 +1,382 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.jmx; + +import org.apache.log4j.Logger; + +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.ManagementActor; +import org.apache.qpid.server.logging.messages.ManagementConsoleMessages; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.security.access.Operation; + +import javax.management.Attribute; +import javax.management.JMException; +import javax.management.MBeanInfo; +import javax.management.MBeanOperationInfo; +import javax.management.MBeanServer; +import javax.management.Notification; +import javax.management.NotificationListener; +import javax.management.ObjectName; +import javax.management.remote.JMXConnectionNotification; +import javax.management.remote.JMXPrincipal; +import javax.management.remote.MBeanServerForwarder; +import javax.security.auth.Subject; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.security.AccessControlContext; +import java.security.AccessController; +import java.util.Map; +import java.util.Set; + +/** + * This class can be used by the JMXConnectorServer as an InvocationHandler for the mbean operations. It delegates + * JMX access decisions to the SecurityPlugin. + */ +public class MBeanInvocationHandlerImpl implements InvocationHandler, NotificationListener +{ + private static final Logger _logger = Logger.getLogger(MBeanInvocationHandlerImpl.class); + + private final IApplicationRegistry _appRegistry = ApplicationRegistry.getInstance(); + private final static String DELEGATE = "JMImplementation:type=MBeanServerDelegate"; + private MBeanServer _mbs; + private final ManagementActor _logActor = new ManagementActor(_appRegistry.getRootMessageLogger()); + private final boolean _managementRightsInferAllAccess = + _appRegistry.getConfiguration().getManagementRightsInferAllAccess(); + + public static MBeanServerForwarder newProxyInstance() + { + final InvocationHandler handler = new MBeanInvocationHandlerImpl(); + final Class<?>[] interfaces = new Class[] { MBeanServerForwarder.class }; + + Object proxy = Proxy.newProxyInstance(MBeanServerForwarder.class.getClassLoader(), interfaces, handler); + return MBeanServerForwarder.class.cast(proxy); + } + + private boolean invokeDirectly(String methodName, Object[] args, Subject subject) + { + // Allow operations performed locally on behalf of the connector server itself + if (subject == null) + { + return true; + } + + if (args == null || DELEGATE.equals(args[0])) + { + return true; + } + + // Allow querying available object names and mbeans + if (methodName.equals("queryNames") || methodName.equals("queryMBeans")) + { + return true; + } + + if (args[0] instanceof ObjectName) + { + ObjectName mbean = (ObjectName) args[0]; + + if(!DefaultManagedObject.DOMAIN.equalsIgnoreCase(mbean.getDomain())) + { + return true; + } + } + + return false; + } + + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable + { + String methodName = method.getName(); + + if (methodName.equals("getMBeanServer")) + { + return _mbs; + } + + if (methodName.equals("setMBeanServer")) + { + if (args[0] == null) + { + throw new IllegalArgumentException("Null MBeanServer"); + } + if (_mbs != null) + { + throw new IllegalArgumentException("MBeanServer object already initialized"); + } + _mbs = (MBeanServer) args[0]; + return null; + } + + // Restrict access to "createMBean" and "unregisterMBean" to any user + if (methodName.equals("createMBean") || methodName.equals("unregisterMBean")) + { + _logger.debug("User trying to create or unregister an MBean"); + throw new SecurityException("Access denied: " + methodName); + } + + // Retrieve Subject from current AccessControlContext + AccessControlContext acc = AccessController.getContext(); + Subject subject = Subject.getSubject(acc); + + try + { + if(invokeDirectly(methodName, args, subject)) + { + return method.invoke(_mbs, args); + } + + // Retrieve JMXPrincipal from Subject + Set<JMXPrincipal> principals = subject.getPrincipals(JMXPrincipal.class); + if (principals == null || principals.isEmpty()) + { + throw new SecurityException("Access denied: no JMX principal"); + } + + // Save the subject + SecurityManager.setThreadSubject(subject); + + // Get the component, type and impact, which may be null + String type = getType(method, args); + String vhost = getVirtualHost(method, args); + int impact = getImpact(method, args); + + // Get the security manager for the virtual host (if set) + SecurityManager security; + if (vhost == null) + { + security = _appRegistry.getSecurityManager(); + } + else + { + security = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhost).getSecurityManager(); + } + + methodName = getMethodName(method, args); + if (isAccessMethod(methodName) || impact == MBeanOperationInfo.INFO) + { + // Check for read-only method invocation permission + if (!security.authoriseMethod(Operation.ACCESS, type, methodName)) + { + throw new SecurityException("Permission denied: Access " + methodName); + } + } + else + { + // Check for setting properties permission + if (!security.authoriseMethod(Operation.UPDATE, type, methodName)) + { + throw new SecurityException("Permission denied: Update " + methodName); + } + } + + boolean oldAccessChecksDisabled = false; + if(_managementRightsInferAllAccess) + { + oldAccessChecksDisabled = SecurityManager.setAccessChecksDisabled(true); + } + + try + { + return doInvokeWrappingWithManagementActor(method, args); + } + finally + { + if(_managementRightsInferAllAccess) + { + SecurityManager.setAccessChecksDisabled(oldAccessChecksDisabled); + } + } + } + catch (InvocationTargetException e) + { + throw e.getTargetException(); + } + } + + private Object doInvokeWrappingWithManagementActor(Method method, + Object[] args) throws IllegalAccessException, + InvocationTargetException + { + try + { + CurrentActor.set(_logActor); + return method.invoke(_mbs, args); + } + finally + { + CurrentActor.remove(); + } + } + + private String getType(Method method, Object[] args) + { + if (args[0] instanceof ObjectName) + { + ObjectName object = (ObjectName) args[0]; + String type = object.getKeyProperty("type"); + + return type; + } + return null; + } + + private String getVirtualHost(Method method, Object[] args) + { + if (args[0] instanceof ObjectName) + { + ObjectName object = (ObjectName) args[0]; + String vhost = object.getKeyProperty("VirtualHost"); + + if(vhost != null) + { + try + { + //if the name is quoted in the ObjectName, unquote it + vhost = ObjectName.unquote(vhost); + } + catch(IllegalArgumentException e) + { + //ignore, this just means the name is not quoted + //and can be left unchanged + } + } + + return vhost; + } + return null; + } + + private String getMethodName(Method method, Object[] args) + { + String methodName = method.getName(); + + // if arguments are set, try and work out real method name + if (args != null && args.length >= 1 && args[0] instanceof ObjectName) + { + if (methodName.equals("getAttribute")) + { + methodName = "get" + (String) args[1]; + } + else if (methodName.equals("setAttribute")) + { + methodName = "set" + ((Attribute) args[1]).getName(); + } + else if (methodName.equals("invoke")) + { + methodName = (String) args[1]; + } + } + + return methodName; + } + + private int getImpact(Method method, Object[] args) + { + //handle invocation of other methods on mbeans + if ((args[0] instanceof ObjectName) && (method.getName().equals("invoke"))) + { + //get invoked method name + String mbeanMethod = (args.length > 1) ? (String) args[1] : null; + if (mbeanMethod == null) + { + return -1; + } + + try + { + //Get the impact attribute + MBeanInfo mbeanInfo = _mbs.getMBeanInfo((ObjectName) args[0]); + if (mbeanInfo != null) + { + MBeanOperationInfo[] opInfos = mbeanInfo.getOperations(); + for (MBeanOperationInfo opInfo : opInfos) + { + if (opInfo.getName().equals(mbeanMethod)) + { + return opInfo.getImpact(); + } + } + } + } + catch (JMException ex) + { + _logger.error("Unable to determine mbean impact for method : " + mbeanMethod, ex); + } + } + + return -1; + } + + private boolean isAccessMethod(String methodName) + { + //handle standard get/query/is methods from MBeanServer + return (methodName.startsWith("query") || methodName.startsWith("get") || methodName.startsWith("is")); + } + + /** + * Receives notifications from the MBeanServer. + */ + public void handleNotification(final Notification notification, final Object handback) + { + assert notification instanceof JMXConnectionNotification; + + final String connectionId = ((JMXConnectionNotification) notification).getConnectionId(); + final String type = notification.getType(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Notification connectionId : " + connectionId + " type : " + type + + " Notification handback : " + handback); + } + + // Normally JMXManagedObjectRegistry provides a Map as handback data containing a map + // between connection id and username. + String user = null; + if (handback instanceof Map) + { + final Map<String, String> connectionIdUsernameMap = (Map<String, String>) handback; + user = connectionIdUsernameMap.get(connectionId); + } + + // If user is still null, fallback to an unordered list of Principals from the connection id. + if (user == null) + { + final String[] splitConnectionId = connectionId.split(" "); + user = splitConnectionId[1]; + } + + if (JMXConnectionNotification.OPENED.equals(type)) + { + _logActor.message(ManagementConsoleMessages.OPEN(user)); + } + else if (JMXConnectionNotification.CLOSED.equals(type) || + JMXConnectionNotification.FAILED.equals(type)) + { + _logActor.message(ManagementConsoleMessages.CLOSE(user)); + } + } +} + diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/MBeanProvider.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/MBeanProvider.java new file mode 100644 index 0000000000..83909dbe72 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/MBeanProvider.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.jmx; + +import java.util.ServiceLoader; + +import javax.management.JMException; +import javax.management.StandardMBean; + +import org.apache.qpid.server.model.ConfiguredObject; + +/** + * A provider of an mbean implementation. + * + * Provider implementations are advertised as services and loaded via {@link ServiceLoader}. + */ +public interface MBeanProvider +{ + /** + * Tests whether a <code>child</code> can be managed by the mbean + * provided by this provider. + */ + boolean isChildManageableByMBean(ConfiguredObject child); + + /** + * Creates a mbean for this child. This method should only be called if + * {@link #isChildManageableByMBean(ConfiguredObject)} has previously returned true. + * + * @return newly created mbean + */ + StandardMBean createMBean(ConfiguredObject child, StandardMBean parent) throws JMException; + +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/ManagedObject.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/ManagedObject.java new file mode 100644 index 0000000000..40b778fd93 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/ManagedObject.java @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.jmx; + +import javax.management.JMException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +/** + * This should be implemented by all Managable objects. + */ +public interface ManagedObject +{ + static final String DOMAIN = "org.apache.qpid"; + + /** + * @return the name that uniquely identifies this object instance. It must be + * unique only among objects of this type at this level in the hierarchy so + * the uniqueness should not be too difficult to ensure. + */ + String getObjectInstanceName(); + + String getType(); + + Class<?> getManagementInterface(); + + ManagedObject getParentObject(); + + void register() throws JMException; + + void unregister() throws JMException; + + /** + * Returns the ObjectName required for the mbeanserver registration. + * @return ObjectName + * @throws javax.management.MalformedObjectNameException + */ + ObjectName getObjectName() throws MalformedObjectNameException; +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/ManagedObjectRegistry.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/ManagedObjectRegistry.java new file mode 100644 index 0000000000..2ae0ac7052 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/ManagedObjectRegistry.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.jmx; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.common.Closeable; + +import javax.management.JMException; +import java.io.IOException; + +/** + * Handles the registration (and unregistration and so on) of managed objects. + * + * Managed objects are responsible for exposting attributes, operations and notifications. They will expose + * these outside the JVM therefore it is important not to use implementation objects directly as managed objects. + * Instead, creating inner classes and exposing those is an effective way of exposing internal state in a + * controlled way. + * + * Although we do not explictly use them while targetting Java 5, the enhanced MXBean approach in Java 6 will + * be the obvious choice for managed objects. + * + */ +public interface ManagedObjectRegistry extends Closeable +{ + void start() throws IOException, ConfigurationException; + + void registerObject(ManagedObject managedObject) throws JMException; + + void unregisterObject(ManagedObject managedObject) throws JMException; +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/AbstractStatisticsGatheringMBean.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/AbstractStatisticsGatheringMBean.java new file mode 100644 index 0000000000..4115f9f363 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/AbstractStatisticsGatheringMBean.java @@ -0,0 +1,196 @@ +package org.apache.qpid.server.jmx.mbeans; + +import javax.management.NotCompliantMBeanException; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Connection; +import org.apache.qpid.server.jmx.AMQManagedObject; +import org.apache.qpid.server.jmx.ManagedObjectRegistry; +import org.apache.qpid.server.model.VirtualHost; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +abstract class AbstractStatisticsGatheringMBean<T extends ConfiguredObject> extends AMQManagedObject +{ + private long _lastStatUpdateTime; + private long _statUpdatePeriod = 5000L; + private long _lastMessagesReceived; + private long _lastMessagesSent; + private long _lastBytesReceived; + private long _lastBytesSent; + private double _messageReceivedRate; + private double _messageSentRate; + private double _bytesReceivedRate; + private double _bytesSentRate; + private double _peakMessageReceivedRate; + private double _peakMessageSentRate; + private double _peakBytesReceivedRate; + private double _peakBytesSentRate; + private final T _configuredObject; + + protected AbstractStatisticsGatheringMBean(Class<?> managementInterface, + String typeName, + ManagedObjectRegistry registry, + T object) throws NotCompliantMBeanException + { + super(managementInterface, typeName, registry); + _configuredObject = object; + initStats(); + } + + protected void initStats() + { + _lastStatUpdateTime = System.currentTimeMillis(); + } + + protected synchronized void updateStats() + { + long time = System.currentTimeMillis(); + final long period = time - _lastStatUpdateTime; + if(period > _statUpdatePeriod) + { + long messagesReceived = getStatistic(VirtualHost.MESSAGES_IN); + long messagesSent = getStatistic(VirtualHost.MESSAGES_OUT); + long bytesReceived = getStatistic(VirtualHost.BYTES_IN); + long bytesSent = getStatistic(VirtualHost.BYTES_OUT); + + double messageReceivedRate = (double)(messagesReceived - _lastMessagesReceived) / (double)period; + double messageSentRate = (double)(messagesSent - _lastMessagesSent) / (double)period; + double bytesReceivedRate = (double)(bytesReceived - _lastBytesReceived) / (double)period; + double bytesSentRate = (double)(bytesSent - _lastBytesSent) / (double)period; + + _lastMessagesReceived = messagesReceived; + _lastMessagesSent = messagesSent; + _lastBytesReceived = bytesReceived; + _lastBytesSent = bytesSent; + + _messageReceivedRate = messageReceivedRate; + _messageSentRate = messageSentRate; + _bytesReceivedRate = bytesReceivedRate; + _bytesSentRate = bytesSentRate; + + if(messageReceivedRate > _peakMessageReceivedRate) + { + _peakMessageReceivedRate = messageReceivedRate; + } + + if(messageSentRate > _peakMessageSentRate) + { + _peakMessageSentRate = messageSentRate; + } + + if(bytesReceivedRate > _peakBytesReceivedRate) + { + _peakBytesReceivedRate = bytesReceivedRate; + } + + if(bytesSentRate > _peakBytesSentRate) + { + _peakBytesSentRate = bytesSentRate; + } + + } + } + + private long getStatistic(String name) + { + return (Long) getConfiguredObject().getStatistics().getStatistic(name); + } + + public synchronized void resetStatistics() throws Exception + { + updateStats(); + //TODO - implement resetStatistics() + } + + public synchronized double getPeakMessageDeliveryRate() + { + updateStats(); + return _peakMessageSentRate; + } + + public synchronized double getPeakDataDeliveryRate() + { + updateStats(); + return _peakBytesSentRate; + } + + public synchronized double getMessageDeliveryRate() + { + updateStats(); + return _messageSentRate; + } + + public synchronized double getDataDeliveryRate() + { + updateStats(); + return _bytesSentRate; + } + + public synchronized long getTotalMessagesDelivered() + { + updateStats(); + return getStatistic(Connection.MESSAGES_OUT); + } + + public synchronized long getTotalDataDelivered() + { + updateStats(); + return getStatistic(Connection.BYTES_OUT); + } + + protected final T getConfiguredObject() + { + return _configuredObject; + } + + public synchronized double getPeakMessageReceiptRate() + { + updateStats(); + return _peakMessageReceivedRate; + } + + public synchronized double getPeakDataReceiptRate() + { + updateStats(); + return _peakBytesReceivedRate; + } + + public synchronized double getMessageReceiptRate() + { + updateStats(); + return _messageReceivedRate; + } + + public synchronized double getDataReceiptRate() + { + updateStats(); + return _bytesReceivedRate; + } + + public synchronized long getTotalMessagesReceived() + { + updateStats(); + return getStatistic(Connection.MESSAGES_IN); + } + + public synchronized long getTotalDataReceived() + { + updateStats(); + return getStatistic(Connection.BYTES_IN); + } + +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConfigurationManagementMBean.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConfigurationManagementMBean.java new file mode 100644 index 0000000000..beffb4eaa9 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConfigurationManagementMBean.java @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.jmx.mbeans; + +import org.apache.qpid.management.common.mbeans.ConfigurationManagement; +import org.apache.qpid.server.jmx.AMQManagedObject; +import org.apache.qpid.server.jmx.ManagedObject; +import org.apache.qpid.server.jmx.ManagedObjectRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; + +import javax.management.JMException; +import javax.management.NotCompliantMBeanException; + +public class ConfigurationManagementMBean extends AMQManagedObject implements ConfigurationManagement +{ + + public ConfigurationManagementMBean(ManagedObjectRegistry registry) throws JMException + { + super(ConfigurationManagement.class, ConfigurationManagement.TYPE, registry); + register(); + } + + public String getObjectInstanceName() + { + return ConfigurationManagement.TYPE; + } + + public void reloadSecurityConfiguration() throws Exception + { + ApplicationRegistry.getInstance().getConfiguration().reparseConfigFileSecuritySections(); + } + + @Override + public ManagedObject getParentObject() + { + return null; + } +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java new file mode 100644 index 0000000000..024ee39318 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java @@ -0,0 +1,183 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.jmx.mbeans; + +import java.io.IOException; +import java.util.Collection; +import java.util.Date; +import javax.management.JMException; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; + +import org.apache.qpid.management.common.mbeans.ManagedConnection; +import org.apache.qpid.server.jmx.ManagedObject; +import org.apache.qpid.server.model.Connection; +import org.apache.qpid.server.model.Session; +import org.apache.qpid.server.model.Statistics; + +public class ConnectionMBean extends AbstractStatisticsGatheringMBean<Connection> implements ManagedConnection +{ + private static final OpenType[] CHANNEL_ATTRIBUTE_TYPES = + { SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER, SimpleType.BOOLEAN }; + private static final CompositeType CHANNEL_TYPE; + private static final TabularType CHANNELS_TYPE; + + static + { + try + { + CHANNEL_TYPE = new CompositeType("Channel", "Channel Details", COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]), + COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]), + CHANNEL_ATTRIBUTE_TYPES); + CHANNELS_TYPE = new TabularType("Channels", "Channels", CHANNEL_TYPE, (String[]) TABULAR_UNIQUE_INDEX.toArray(new String[TABULAR_UNIQUE_INDEX.size()])); + } + catch (JMException ex) + { + // This is not expected to ever occur. + throw new RuntimeException("Got JMException in static initializer.", ex); + } + } + + + private final VirtualHostMBean _virtualHostMBean; + + public ConnectionMBean(Connection conn, VirtualHostMBean virtualHostMBean) throws JMException + { + super(ManagedConnection.class, ManagedConnection.TYPE, virtualHostMBean.getRegistry(), conn); + _virtualHostMBean = virtualHostMBean; + register(); + } + + public String getObjectInstanceName() + { + return ObjectName.quote(getRemoteAddress()); + } + + @Override + public ManagedObject getParentObject() + { + return _virtualHostMBean; + } + + public String getClientId() + { + return (String) getConfiguredObject().getAttribute(Connection.CLIENT_ID); + } + + public String getAuthorizedId() + { + return (String) getConfiguredObject().getAttribute(Connection.PRINCIPAL); + } + + public String getVersion() + { + return (String) getConfiguredObject().getAttribute(Connection.CLIENT_VERSION); + } + + public String getRemoteAddress() + { + return (String) getConfiguredObject().getAttribute(Connection.REMOTE_ADDRESS); + } + + public Date getLastIoTime() + { + Long lastIo = (Long) getConfiguredObject().getStatistics().getStatistic(Connection.LAST_IO_TIME); + return new Date(lastIo); + } + + public Long getMaximumNumberOfChannels() + { + return (Long) getConfiguredObject().getAttribute(Connection.SESSION_COUNT_LIMIT); + } + + public TabularData channels() throws IOException, JMException + { + TabularDataSupport sessionTable = new TabularDataSupport(CHANNELS_TYPE); + Collection<Session> list = getConfiguredObject().getSessions(); + + for (Session session : list) + { + Statistics statistics = session.getStatistics(); + Long txnBegins = (Long) statistics.getStatistic(Session.LOCAL_TRANSACTION_BEGINS); + Integer channelId = (Integer) session.getAttribute(Session.CHANNEL_ID); + int unacknowledgedSize = ((Number) statistics.getStatistic(Session.UNACKNOWLEDGED_MESSAGES)).intValue(); + boolean blocked = (Boolean) session.getAttribute(Session.PRODUCER_FLOW_BLOCKED); + boolean isTransactional = (txnBegins>0l); + + Object[] itemValues = + { + channelId, + isTransactional, + null, // TODO - default queue (which is meaningless) + unacknowledgedSize, + blocked + }; + + CompositeData sessionData = new CompositeDataSupport(CHANNEL_TYPE, + COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]), itemValues); + sessionTable.put(sessionData); + } + + return sessionTable; + } + + public void commitTransactions(int channelId) throws JMException + { + throw buildUnsupportedException(); + } + + public void rollbackTransactions(int channelId) throws JMException + { + throw buildUnsupportedException(); + } + + public void closeConnection() throws Exception + { + getConfiguredObject().delete(); + } + + public boolean isStatisticsEnabled() + { + return true; + } + + public void setStatisticsEnabled(boolean enabled) + { + // TODO - Implement setStatisticsEnabled + updateStats(); + } + + private JMException buildUnsupportedException() throws JMException + { + String msg = "Operation not supported"; + JMException jmException = new JMException(msg); + jmException.initCause(new UnsupportedOperationException(msg)); + return jmException; + } +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java new file mode 100644 index 0000000000..eb7e716af8 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java @@ -0,0 +1,323 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.jmx.mbeans; + +import org.apache.qpid.management.common.mbeans.ManagedExchange; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParameter; +import org.apache.qpid.server.jmx.AMQManagedObject; +import org.apache.qpid.server.jmx.ManagedObject; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.VirtualHost; + +import javax.management.JMException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.openmbean.ArrayType; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ExchangeMBean extends AMQManagedObject implements ManagedExchange +{ + + private static final String[] TABULAR_UNIQUE_INDEX_ARRAY = + TABULAR_UNIQUE_INDEX.toArray(new String[TABULAR_UNIQUE_INDEX.size()]); + + private static final String[] COMPOSITE_ITEM_NAMES_ARRAY = + COMPOSITE_ITEM_NAMES.toArray(new String[COMPOSITE_ITEM_NAMES.size()]); + + private static final String[] COMPOSITE_ITEM_DESCRIPTIONS_ARRAY = + COMPOSITE_ITEM_DESCRIPTIONS.toArray(new String[COMPOSITE_ITEM_DESCRIPTIONS.size()]); + + private static final OpenType[] BINDING_ITEM_TYPES; + private static final CompositeType BINDING_DATA_TYPE; + private static final OpenType[] HEADERS_BINDING_ITEM_TYPES; + + + private static final CompositeType HEADERS_BINDING_DATA_TYPE; + + private static final String[] HEADERS_COMPOSITE_ITEM_NAMES_ARRAY = + HEADERS_COMPOSITE_ITEM_NAMES.toArray(new String[HEADERS_COMPOSITE_ITEM_NAMES.size()]); + + private static final String[] HEADERS_COMPOSITE_ITEM_DESCS_ARRAY = + HEADERS_COMPOSITE_ITEM_DESC.toArray(new String[HEADERS_COMPOSITE_ITEM_DESC.size()]); + private static final String[] HEADERS_TABULAR_UNIQUE_INDEX_ARRAY = + HEADERS_TABULAR_UNIQUE_INDEX.toArray(new String[HEADERS_TABULAR_UNIQUE_INDEX.size()]); + public static final String HEADERS_EXCHANGE_TYPE = "headers"; + + static + { + try + { + BINDING_ITEM_TYPES = new OpenType[] {SimpleType.STRING, new ArrayType(1, SimpleType.STRING)}; + + BINDING_DATA_TYPE= new CompositeType("Exchange Binding", "Binding key and Queue names", + COMPOSITE_ITEM_NAMES_ARRAY, + COMPOSITE_ITEM_DESCRIPTIONS_ARRAY, + BINDING_ITEM_TYPES); + + HEADERS_BINDING_ITEM_TYPES = new OpenType[] {SimpleType.INTEGER, + SimpleType.STRING, + new ArrayType(1, SimpleType.STRING)}; + + HEADERS_BINDING_DATA_TYPE = new CompositeType("Exchange Binding", "Queue name and header bindings", + HEADERS_COMPOSITE_ITEM_NAMES_ARRAY, + HEADERS_COMPOSITE_ITEM_DESCS_ARRAY, + HEADERS_BINDING_ITEM_TYPES); + + + } + catch(OpenDataException e) + { + throw new RuntimeException("Unexpected Error creating ArrayType", e); + } + } + + + private final Exchange _exchange; + private final VirtualHostMBean _vhostMBean; + + protected ExchangeMBean(Exchange exchange, VirtualHostMBean virtualHostMBean) + throws JMException + { + super(ManagedExchange.class, ManagedExchange.TYPE, virtualHostMBean.getRegistry()); + _exchange = exchange; + _vhostMBean = virtualHostMBean; + + register(); + } + + public String getObjectInstanceName() + { + return ObjectName.quote(getName()); + } + + @Override + public ManagedObject getParentObject() + { + return _vhostMBean; + } + + public ObjectName getObjectName() throws MalformedObjectNameException + { + String objNameString = super.getObjectName().toString(); + objNameString = objNameString + ",ExchangeType=" + getExchangeType(); + return new ObjectName(objNameString); + } + + + public String getName() + { + return _exchange.getName(); + } + + public String getExchangeType() + { + return _exchange.getExchangeType(); + } + + public Integer getTicketNo() + { + return 0; + } + + public boolean isDurable() + { + return _exchange.isDurable(); + } + + public boolean isAutoDelete() + { + return _exchange.getLifetimePolicy() == LifetimePolicy.AUTO_DELETE; + } + + public TabularData bindings() throws IOException, JMException + { + if(HEADERS_EXCHANGE_TYPE.equals(_exchange.getExchangeType())) + { + return getHeadersBindings(_exchange.getBindings()); + } + else + { + return getNonHeadersBindings(_exchange.getBindings()); + } + } + + + private TabularData getHeadersBindings(Collection<Binding> bindings) throws OpenDataException + { + TabularType bindinglistDataType = + new TabularType("Exchange Bindings", "List of exchange bindings for " + getName(), + HEADERS_BINDING_DATA_TYPE, + HEADERS_TABULAR_UNIQUE_INDEX_ARRAY); + + TabularDataSupport bindingList = new TabularDataSupport(bindinglistDataType); + int count = 1; + for (Binding binding : bindings) + { + + String queueName = binding.getParent(Queue.class).getName(); + + + Map<String,Object> headerMappings = binding.getArguments(); + + final List<String> mappingList = new ArrayList<String>(); + + if(headerMappings != null) + { + for(Map.Entry<String,Object> entry : headerMappings.entrySet()) + { + + mappingList.add(entry.getKey() + "=" + entry.getValue()); + } + } + + + Object[] bindingItemValues = {count++, queueName, mappingList.toArray(new String[0])}; + CompositeData bindingData = new CompositeDataSupport(HEADERS_BINDING_DATA_TYPE, + HEADERS_COMPOSITE_ITEM_NAMES_ARRAY, + bindingItemValues); + bindingList.put(bindingData); + } + + return bindingList; + + } + + private TabularData getNonHeadersBindings(Collection<Binding> bindings) throws OpenDataException + { + + TabularType bindinglistDataType = + new TabularType("Exchange Bindings", "Exchange Bindings for " + getName(), + BINDING_DATA_TYPE, + TABULAR_UNIQUE_INDEX_ARRAY); + + TabularDataSupport bindingList = new TabularDataSupport(bindinglistDataType); + + Map<String, List<String>> bindingMap = new HashMap<String, List<String>>(); + + for (Binding binding : bindings) + { + String key = "fanout".equals(_exchange.getExchangeType()) ? "*" : binding.getName(); + List<String> queueList = bindingMap.get(key); + if(queueList == null) + { + queueList = new ArrayList<String>(); + bindingMap.put(key, queueList); + } + queueList.add(binding.getParent(Queue.class).getName()); + + } + + for(Map.Entry<String, List<String>> entry : bindingMap.entrySet()) + { + Object[] bindingItemValues = {entry.getKey(), entry.getValue().toArray(new String[0])}; + CompositeData bindingData = new CompositeDataSupport(BINDING_DATA_TYPE, + COMPOSITE_ITEM_NAMES_ARRAY, + bindingItemValues); + bindingList.put(bindingData); + } + + return bindingList; + } + + public void createNewBinding(String queueName, String binding) throws JMException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + + if(HEADERS_EXCHANGE_TYPE.equals(_exchange.getExchangeType())) + { + final String[] bindings = binding.split(","); + for (int i = 0; i < bindings.length; i++) + { + final String[] keyAndValue = bindings[i].split("="); + if (keyAndValue == null || keyAndValue.length == 0 || keyAndValue.length > 2 || keyAndValue[0].length() == 0) + { + throw new JMException("Format for headers binding should be \"<attribute1>=<value1>,<attribute2>=<value2>\" "); + } + + if(keyAndValue.length == 1) + { + //no value was given, only a key. Use an empty value to signal match on key presence alone + arguments.put(keyAndValue[0], ""); + } + else + { + arguments.put(keyAndValue[0], keyAndValue[1]); + } + } + } + + Queue queue = null; + VirtualHost vhost = _exchange.getParent(VirtualHost.class); + for(Queue aQueue : vhost.getQueues()) + { + if(aQueue.getName().equals(queueName)) + { + queue = aQueue; + break; + } + } + _exchange.createBinding(binding, queue, arguments, Collections.EMPTY_MAP); + } + + public void removeBinding(String queueName, String bindingKey) + throws IOException, JMException + { + Queue queue = null; + VirtualHost vhost = _exchange.getParent(VirtualHost.class); + for(Queue aQueue : vhost.getQueues()) + { + if(aQueue.getName().equals(queueName)) + { + queue = aQueue; + break; + } + } + + for(Binding binding : _exchange.getBindings()) + { + if(queue.equals(binding.getParent(Queue.class)) && bindingKey.equals(binding.getName())) + { + binding.delete(); + } + } + + } +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/LoggingManagementMBean.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/LoggingManagementMBean.java new file mode 100644 index 0000000000..9ff45979ca --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/LoggingManagementMBean.java @@ -0,0 +1,832 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.jmx.mbeans; + +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.log4j.xml.Log4jEntityResolver; +import org.apache.log4j.xml.QpidLog4JConfigurator; +import org.apache.log4j.xml.QpidLog4JConfigurator.IllegalLoggerLevelException; +import org.apache.log4j.xml.QpidLog4JConfigurator.QpidLog4JSaxErrorHandler; +import org.apache.qpid.management.common.mbeans.LoggingManagement; +import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; +import org.apache.qpid.server.jmx.AMQManagedObject; +import org.apache.qpid.server.jmx.ManagedObject; +import org.apache.qpid.server.jmx.ManagedObjectRegistry; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; +import org.xml.sax.ErrorHandler; +import org.xml.sax.SAXException; + +import javax.management.JMException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.transform.OutputKeys; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import static org.apache.log4j.xml.QpidLog4JConfigurator.LOCK; + + +/** MBean class for BrokerLoggingManagerMBean. It implements all the management features exposed for managing logging. */ +@MBeanDescription("Logging Management Interface") +public class LoggingManagementMBean extends AMQManagedObject implements LoggingManagement +{ + + private static final Logger _logger = Logger.getLogger(LoggingManagementMBean.class); + private String _log4jConfigFileName; + private int _log4jLogWatchInterval; + private static final String INHERITED = "INHERITED"; + private static final String[] LEVELS = new String[]{Level.ALL.toString(), Level.TRACE.toString(), + Level.DEBUG.toString(), Level.INFO.toString(), + Level.WARN.toString(), Level.ERROR.toString(), + Level.FATAL.toString(),Level.OFF.toString(), + INHERITED}; + private static TabularType _loggerLevelTabularType; + private static CompositeType _loggerLevelCompositeType; + + static + { + try + { + OpenType[] loggerLevelItemTypes = new OpenType[]{SimpleType.STRING, SimpleType.STRING}; + + _loggerLevelCompositeType = new CompositeType("LoggerLevelList", "Logger Level Data", + COMPOSITE_ITEM_NAMES.toArray(new String[COMPOSITE_ITEM_NAMES.size()]), + COMPOSITE_ITEM_DESCRIPTIONS.toArray(new String[COMPOSITE_ITEM_DESCRIPTIONS.size()]), + loggerLevelItemTypes); + + _loggerLevelTabularType = new TabularType("LoggerLevel", "List of loggers with levels", + _loggerLevelCompositeType, + TABULAR_UNIQUE_INDEX.toArray(new String[TABULAR_UNIQUE_INDEX.size()])); + } + catch (OpenDataException e) + { + _logger.error("Tabular data setup for viewing logger levels was incorrect."); + _loggerLevelTabularType = null; + } + } + + public LoggingManagementMBean(String log4jConfigFileName, + int log4jLogWatchInterval, + ManagedObjectRegistry registry) throws JMException + { + super(LoggingManagement.class, LoggingManagement.TYPE, registry); + _log4jConfigFileName = log4jConfigFileName; + _log4jLogWatchInterval = log4jLogWatchInterval; + register(); + } + + public String getObjectInstanceName() + { + return LoggingManagement.TYPE; + } + + public Integer getLog4jLogWatchInterval() + { + return _log4jLogWatchInterval; + } + + public String[] getAvailableLoggerLevels() + { + return LEVELS; + } + @SuppressWarnings("unchecked") + public synchronized boolean setRuntimeLoggerLevel(String logger, String level) + { + //check specified level is valid + Level newLevel; + try + { + newLevel = getLevel(level); + } + catch (Exception e) + { + return false; + } + + //check specified logger exists + Enumeration loggers = LogManager.getCurrentLoggers(); + Boolean loggerExists = false; + + while(loggers.hasMoreElements()) + { + Logger log = (Logger) loggers.nextElement(); + if (log.getName().equals(logger)) + { + loggerExists = true; + break; + } + } + + if(!loggerExists) + { + return false; + } + + //set the logger to the new level + _logger.info("Setting level to " + level + " for logger: " + logger); + + Logger log = Logger.getLogger(logger); + log.setLevel(newLevel); + + return true; + } + + @SuppressWarnings("unchecked") + public synchronized TabularData viewEffectiveRuntimeLoggerLevels() + { + if (_loggerLevelTabularType == null) + { + _logger.warn("TabluarData type not set up correctly"); + return null; + } + + _logger.info("Getting levels for currently active log4j loggers"); + + Enumeration loggers = LogManager.getCurrentLoggers(); + + TabularData loggerLevelList = new TabularDataSupport(_loggerLevelTabularType); + + Logger logger; + String loggerName; + String level; + + try + { + while(loggers.hasMoreElements()){ + logger = (Logger) loggers.nextElement(); + + loggerName = logger.getName(); + level = logger.getEffectiveLevel().toString(); + + Object[] itemData = {loggerName, level}; + CompositeData loggerData = new CompositeDataSupport(_loggerLevelCompositeType, + COMPOSITE_ITEM_NAMES.toArray(new String[COMPOSITE_ITEM_NAMES.size()]), itemData); + loggerLevelList.put(loggerData); + } + } + catch (OpenDataException e) + { + _logger.warn("Unable to create logger level list due to :" + e); + return null; + } + + return loggerLevelList; + + } + + public synchronized String getRuntimeRootLoggerLevel() + { + Logger rootLogger = Logger.getRootLogger(); + + return rootLogger.getLevel().toString(); + } + + public synchronized boolean setRuntimeRootLoggerLevel(String level) + { + Level newLevel; + try + { + newLevel = getLevel(level); + } + catch (Exception e) + { + return false; + } + + if(newLevel == null) + { + //A null Level reference implies inheritance. Setting the runtime RootLogger + //to null is catastrophic (and prevented by Log4J at startup and runtime anyway). + return false; + } + + _logger.info("Setting RootLogger level to " + level); + + Logger log = Logger.getRootLogger(); + log.setLevel(newLevel); + + return true; + } + + //method to convert from a string to a log4j Level, throws exception if the given value is invalid + private Level getLevel(String level) throws Exception + { + if("null".equalsIgnoreCase(level) || INHERITED.equalsIgnoreCase(level)) + { + //the string "null" or "inherited" signals to inherit from a parent logger, + //using a null Level reference for the logger. + return null; + } + + Level newLevel = Level.toLevel(level); + + //above Level.toLevel call returns a DEBUG Level if the request fails. Check the result. + if (newLevel.equals(Level.DEBUG) && !(level.equalsIgnoreCase("debug"))) + { + //received DEBUG but we did not ask for it, the Level request failed. + throw new Exception("Invalid level name"); + } + + return newLevel; + } + + //method to parse the XML configuration file, validating it in the process, and returning a DOM Document of the content. + private static synchronized Document parseConfigFile(String fileName) throws IOException + { + try + { + LOCK.lock(); + + //check file was specified, exists, and is readable + if(fileName == null) + { + _logger.warn("Provided log4j XML configuration filename is null"); + throw new IOException("Provided log4j XML configuration filename is null"); + } + + File configFile = new File(fileName); + + if (!configFile.exists()) + { + _logger.warn("The log4j XML configuration file could not be found: " + fileName); + throw new IOException("The log4j XML configuration file could not be found"); + } + else if (!configFile.canRead()) + { + _logger.warn("The log4j XML configuration file is not readable: " + fileName); + throw new IOException("The log4j XML configuration file is not readable"); + } + + //parse it + DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder docBuilder; + Document doc; + + ErrorHandler errHandler = new QpidLog4JSaxErrorHandler(); + try + { + docFactory.setValidating(true); + docBuilder = docFactory.newDocumentBuilder(); + docBuilder.setErrorHandler(errHandler); + docBuilder.setEntityResolver(new Log4jEntityResolver()); + doc = docBuilder.parse(fileName); + } + catch (ParserConfigurationException e) + { + _logger.warn("Unable to parse the log4j XML file due to possible configuration error: " + e); + //recommended that MBeans should use java.* and javax.* exceptions only + throw new IOException("Unable to parse the log4j XML file due to possible configuration error: " + e.getMessage()); + } + catch (SAXException e) + { + _logger.warn("The specified log4j XML file is invalid: " + e); + //recommended that MBeans should use standard java.* and javax.* exceptions only + throw new IOException("The specified log4j XML file is invalid: " + e.getMessage()); + } + catch (IOException e) + { + _logger.warn("Unable to parse the specified log4j XML file" + e); + throw new IOException("Unable to parse the specified log4j XML file: " + e.getMessage()); + } + + return doc; + } + finally + { + LOCK.unlock(); + } + } + + + private static synchronized boolean writeUpdatedConfigFile(String log4jConfigFileName, Document doc) throws IOException + { + try + { + LOCK.lock(); + + File log4jConfigFile = new File(log4jConfigFileName); + + if (!log4jConfigFile.canWrite()) + { + _logger.warn("Specified log4j XML configuration file is not writable: " + log4jConfigFile); + throw new IOException("Specified log4j XML configuration file is not writable"); + } + + Transformer transformer = null; + try + { + transformer = TransformerFactory.newInstance().newTransformer(); + } + catch (Exception e) + { + _logger.warn("Could not create an XML transformer: " +e); + return false; + } + + transformer.setOutputProperty(OutputKeys.INDENT, "yes"); + transformer.setOutputProperty(OutputKeys.DOCTYPE_SYSTEM, "log4j.dtd"); + DOMSource source = new DOMSource(doc); + + File tmp; + Random r = new Random(); + do + { + tmp = new File(log4jConfigFile.getPath() + r.nextInt() + ".tmp"); + } + while(tmp.exists()); + + tmp.deleteOnExit(); + + try + { + StreamResult result = new StreamResult(tmp); + transformer.transform(source, result); + } + catch (TransformerException e) + { + _logger.warn("Could not transform the XML into new file: " +e); + throw new IOException("Could not transform the XML into new file: " +e); + } + + // Swap temp file in to replace existing configuration file. + File old = new File(log4jConfigFile.getAbsoluteFile() + ".old"); + if (old.exists()) + { + old.delete(); + } + + if(!log4jConfigFile.renameTo(old)) + { + //unable to rename the existing file to the backup name + _logger.error("Could not backup the existing log4j XML file"); + throw new IOException("Could not backup the existing log4j XML file"); + } + + if(!tmp.renameTo(log4jConfigFile)) + { + //failed to rename the new file to the required filename + + if(!old.renameTo(log4jConfigFile)) + { + //unable to return the backup to required filename + _logger.error("Could not rename the new log4j configuration file into place, and unable to restore original file"); + throw new IOException("Could not rename the new log4j configuration file into place, and unable to restore original file"); + } + + _logger.error("Could not rename the new log4j configuration file into place"); + throw new IOException("Could not rename the new log4j configuration file into place"); + } + + return true; + } + finally + { + LOCK.unlock(); + } + } + + + /* The log4j XML configuration file DTD defines three possible element + * combinations for specifying optional logger+level settings. + * Must account for the following: + * + * <category name="x"> <priority value="y"/> </category> OR + * <category name="x"> <level value="y"/> </category> OR + * <logger name="x"> <level value="y"/> </logger> + * + * Noting also that the level/priority child element is optional too, + * and not the only possible child element. + */ + + public static synchronized Map<String,String> retrieveConfigFileLoggersLevels(String fileName) throws IOException + { + try + { + LOCK.lock(); + + Document doc = parseConfigFile(fileName); + + HashMap<String,String> loggerLevelList = new HashMap<String,String>(); + + //retrieve the 'category' element nodes + NodeList categoryElements = doc.getElementsByTagName("category"); + + String categoryName; + String priority = null; + + for (int i = 0; i < categoryElements.getLength(); i++) + { + Element categoryElement = (Element) categoryElements.item(i); + categoryName = categoryElement.getAttribute("name"); + + //retrieve the category's mandatory 'priority' or 'level' element's value. + //It may not be the only child node, so request by tag name. + NodeList priorityElements = categoryElement.getElementsByTagName("priority"); + NodeList levelElements = categoryElement.getElementsByTagName("level"); + + if (priorityElements.getLength() != 0) + { + Element priorityElement = (Element) priorityElements.item(0); + priority = priorityElement.getAttribute("value"); + } + else if (levelElements.getLength() != 0) + { + Element levelElement = (Element) levelElements.item(0); + priority = levelElement.getAttribute("value"); + } + else + { + //there is no exiting priority or level to view, move onto next category/logger + continue; + } + + loggerLevelList.put(categoryName, priority); + } + + //retrieve the 'logger' element nodes + NodeList loggerElements = doc.getElementsByTagName("logger"); + + String loggerName; + String level; + + for (int i = 0; i < loggerElements.getLength(); i++) + { + Element loggerElement = (Element) loggerElements.item(i); + loggerName = loggerElement.getAttribute("name"); + + //retrieve the logger's mandatory 'level' element's value + //It may not be the only child node, so request by tag name. + NodeList levelElements = loggerElement.getElementsByTagName("level"); + + Element levelElement = (Element) levelElements.item(0); + level = levelElement.getAttribute("value"); + + loggerLevelList.put(loggerName, level); + } + + return loggerLevelList; + } + finally + { + LOCK.unlock(); + } + } + + public synchronized TabularData viewConfigFileLoggerLevels() throws IOException + { + try + { + LOCK.lock(); + + if (_loggerLevelTabularType == null) + { + _logger.warn("TabluarData type not set up correctly"); + return null; + } + + _logger.info("Getting logger levels from log4j configuration file"); + + TabularData loggerLevelList = new TabularDataSupport(_loggerLevelTabularType); + + Map<String,String> levels = retrieveConfigFileLoggersLevels(_log4jConfigFileName); + + for (Map.Entry<String,String> entry : levels.entrySet()) + { + String loggerName = entry.getKey(); + String level = entry.getValue(); + + try + { + Object[] itemData = {loggerName, level.toUpperCase()}; + CompositeData loggerData = new CompositeDataSupport(_loggerLevelCompositeType, + COMPOSITE_ITEM_NAMES.toArray(new String[COMPOSITE_ITEM_NAMES.size()]), itemData); + loggerLevelList.put(loggerData); + } + catch (OpenDataException e) + { + _logger.warn("Unable to create logger level list due to :" + e); + return null; + } + } + + return loggerLevelList; + } + finally + { + LOCK.unlock(); + } + } + + public synchronized boolean setConfigFileLoggerLevel(String logger, String level) throws IOException + { + try + { + LOCK.lock(); + + //check that the specified level is a valid log4j Level + try + { + getLevel(level); + } + catch (Exception e) + { + //it isnt a valid level + return false; + } + + _logger.info("Setting level to " + level + " for logger '" + logger + + "' in log4j xml configuration file: " + _log4jConfigFileName); + + Document doc = parseConfigFile(_log4jConfigFileName); + + //retrieve the 'category' and 'logger' element nodes + NodeList categoryElements = doc.getElementsByTagName("category"); + NodeList loggerElements = doc.getElementsByTagName("logger"); + + //collect them into a single elements list + List<Element> logElements = new ArrayList<Element>(); + + for (int i = 0; i < categoryElements.getLength(); i++) + { + logElements.add((Element) categoryElements.item(i)); + } + for (int i = 0; i < loggerElements.getLength(); i++) + { + logElements.add((Element) loggerElements.item(i)); + } + + //try to locate the specified logger/category in the elements retrieved + Element logElement = null; + for (Element e : logElements) + { + if (e.getAttribute("name").equals(logger)) + { + logElement = e; + break; + } + } + + if (logElement == null) + { + //no loggers/categories with given name found, does not exist to update + _logger.warn("Specified logger does not exist in the configuration file: " +logger); + return false; + } + + //retrieve the optional 'priority' or 'level' sub-element value. + //It may not be the only child node, so request by tag name. + NodeList priorityElements = logElement.getElementsByTagName("priority"); + NodeList levelElements = logElement.getElementsByTagName("level"); + + Element levelElement = null; + if (priorityElements.getLength() != 0) + { + levelElement = (Element) priorityElements.item(0); + } + else if (levelElements.getLength() != 0) + { + levelElement = (Element) levelElements.item(0); + } + else + { + //there is no exiting priority or level element to update + return false; + } + + //update the element with the new level/priority + levelElement.setAttribute("value", level.toLowerCase()); + + //output the new file + return writeUpdatedConfigFile(_log4jConfigFileName, doc); + } + finally + { + LOCK.unlock(); + } + } + + + /* The log4j XML configuration file DTD defines 2 possible element + * combinations for specifying the optional root logger level settings + * Must account for the following: + * + * <root> <priority value="y"/> </root> OR + * <root> <level value="y"/> </root> + * + * Noting also that the level/priority child element is optional too, + * and not the only possible child element. + */ + + public static synchronized String retrieveConfigFileRootLoggerLevel(String fileName) throws IOException + { + try + { + LOCK.lock(); + + Document doc = parseConfigFile(fileName); + + //retrieve the optional 'root' element node + NodeList rootElements = doc.getElementsByTagName("root"); + + if (rootElements.getLength() == 0) + { + //there is no root logger definition + return "N/A"; + } + + Element rootElement = (Element) rootElements.item(0); + + //retrieve the optional 'priority' or 'level' element value. + //It may not be the only child node, so request by tag name. + NodeList priorityElements = rootElement.getElementsByTagName("priority"); + NodeList levelElements = rootElement.getElementsByTagName("level"); + String priority = null; + + if (priorityElements.getLength() != 0) + { + Element priorityElement = (Element) priorityElements.item(0); + priority = priorityElement.getAttribute("value"); + } + else if(levelElements.getLength() != 0) + { + Element levelElement = (Element) levelElements.item(0); + priority = levelElement.getAttribute("value"); + } + + if(priority != null) + { + return priority; + } + else + { + return "N/A"; + } + } + finally + { + LOCK.unlock(); + } + } + + public synchronized String getConfigFileRootLoggerLevel() throws IOException + { + return retrieveConfigFileRootLoggerLevel(_log4jConfigFileName).toUpperCase(); + } + + public synchronized boolean setConfigFileRootLoggerLevel(String level) throws IOException + { + try + { + LOCK.lock(); + + //check that the specified level is a valid log4j Level + try + { + Level newLevel = getLevel(level); + if(newLevel == null) + { + //A null Level reference implies inheritance. Setting the config file RootLogger + //to "null" or "inherited" just ensures it defaults to DEBUG at startup as Log4J + //prevents this catastrophic situation at startup and runtime anyway. + return false; + } + } + catch (Exception e) + { + //it isnt a valid level + return false; + } + + _logger.info("Setting level to " + level + " for the Root logger in " + + "log4j xml configuration file: " + _log4jConfigFileName); + + Document doc = parseConfigFile(_log4jConfigFileName); + + //retrieve the optional 'root' element node + NodeList rootElements = doc.getElementsByTagName("root"); + + if (rootElements.getLength() == 0) + { + return false; + } + + Element rootElement = (Element) rootElements.item(0); + + //retrieve the optional 'priority' or 'level' sub-element value. + //It may not be the only child node, so request by tag name. + NodeList priorityElements = rootElement.getElementsByTagName("priority"); + NodeList levelElements = rootElement.getElementsByTagName("level"); + + Element levelElement = null; + if (priorityElements.getLength() != 0) + { + levelElement = (Element) priorityElements.item(0); + } + else if (levelElements.getLength() != 0) + { + levelElement = (Element) levelElements.item(0); + } + else + { + //there is no exiting priority/level to update + return false; + } + + //update the element with the new level/priority + levelElement.setAttribute("value", level); + + //output the new file + return writeUpdatedConfigFile(_log4jConfigFileName, doc); + } + finally + { + LOCK.unlock(); + } + } + + public synchronized void reloadConfigFile() throws IOException + { + try + { + LOCK.lock(); + + QpidLog4JConfigurator.configure(_log4jConfigFileName); + _logger.info("Applied log4j configuration from: " + _log4jConfigFileName); + } + catch (IllegalLoggerLevelException e) + { + _logger.warn("The log4j configuration reload request was aborted: " + e); + //recommended that MBeans should use standard java.* and javax.* exceptions only + throw new IOException("The log4j configuration reload request was aborted: " + e.getMessage()); + } + catch (ParserConfigurationException e) + { + _logger.warn("The log4j configuration reload request was aborted: " + e); + throw new IOException("The log4j configuration reload request was aborted: " + e.getMessage()); + } + catch (SAXException e) + { + _logger.warn("The log4j configuration reload request was aborted: " + e); + //recommended that MBeans should use standard java.* and javax.* exceptions only + throw new IOException("The log4j configuration reload request was aborted: " + e.getMessage()); + } + catch (IOException e) + { + _logger.warn("The log4j configuration reload request was aborted: " + e); + throw new IOException("The log4j configuration reload request was aborted: " + e.getMessage()); + } + finally + { + LOCK.unlock(); + } + } + + @Override + public ManagedObject getParentObject() + { + return null; + } +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/MBeanUtils.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/MBeanUtils.java new file mode 100644 index 0000000000..97e84d4796 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/MBeanUtils.java @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.jmx.mbeans; + +import javax.management.OperationsException; + +import org.apache.qpid.server.model.ConfiguredObjectFinder; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.VirtualHost; + +public class MBeanUtils +{ + public static Queue findQueueFromQueueName(VirtualHost virtualHost, String queueName) throws OperationsException + { + Queue queue = ConfiguredObjectFinder.findConfiguredObjectByName(virtualHost.getQueues(), queueName); + if (queue == null) + { + throw new OperationsException("No such queue \""+queueName+"\""); + } + else + { + return queue; + } + } + + public static Exchange findExchangeFromExchangeName(VirtualHost virtualHost, String exchangeName) throws OperationsException + { + Exchange exchange = ConfiguredObjectFinder.findConfiguredObjectByName(virtualHost.getExchanges(), exchangeName); + if (exchange == null) + { + throw new OperationsException("No such exchange \""+exchangeName+"\""); + } + else + { + return exchange; + } + } +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java new file mode 100644 index 0000000000..1416cfdd89 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java @@ -0,0 +1,663 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.jmx.mbeans; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import javax.management.JMException; +import javax.management.MBeanNotificationInfo; +import javax.management.Notification; +import javax.management.ObjectName; +import javax.management.OperationsException; +import javax.management.monitor.MonitorNotification; +import javax.management.openmbean.ArrayType; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; +import org.apache.commons.lang.time.FastDateFormat; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.server.jmx.AMQManagedObject; +import org.apache.qpid.server.jmx.ManagedObject; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.QueueNotificationListener; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.queue.NotificationCheck; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.QueueEntryVisitor; + +public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener +{ + private static final String[] VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC_ARRAY = + VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]); + + private static final OpenType[] MSG_ATTRIBUTE_TYPES; + private static final CompositeType MSG_DATA_TYPE; + private static final TabularType MSG_LIST_DATA_TYPE; + private static final CompositeType MSG_CONTENT_TYPE; + private static final String[] VIEW_MSG_COMPOSIT_ITEM_NAMES_ARRAY = VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.toArray( + new String[VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.size()]); + + static + { + + try + { + MSG_ATTRIBUTE_TYPES = new OpenType[] { + SimpleType.LONG, // For message id + new ArrayType(1, SimpleType.STRING), // For header attributes + SimpleType.LONG, // For size + SimpleType.BOOLEAN, // For redelivered + SimpleType.LONG, // For queue position + SimpleType.INTEGER // For delivery count} + }; + + MSG_DATA_TYPE = new CompositeType("Message", "AMQ Message", + VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC_ARRAY, + VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC_ARRAY, MSG_ATTRIBUTE_TYPES); + + MSG_LIST_DATA_TYPE = new TabularType("Messages", "List of messages", MSG_DATA_TYPE, + VIEW_MSGS_TABULAR_UNIQUE_INDEX.toArray(new String[VIEW_MSGS_TABULAR_UNIQUE_INDEX.size()])); + + OpenType[] msgContentAttrs = new OpenType[] { + SimpleType.LONG, // For message id + SimpleType.STRING, // For MimeType + SimpleType.STRING, // For MimeType + new ArrayType(SimpleType.BYTE, true) // For message content + }; + + + MSG_CONTENT_TYPE = new CompositeType("Message Content", "AMQ Message Content", + VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.size()]), + VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.size()]), + msgContentAttrs); + + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + private final Queue _queue; + private final VirtualHostMBean _vhostMBean; + + /** Date/time format used for message expiration and message timestamp formatting */ + public static final String JMSTIMESTAMP_DATETIME_FORMAT = "MM-dd-yy HH:mm:ss.SSS z"; + + private static final FastDateFormat FAST_DATE_FORMAT = FastDateFormat.getInstance(JMSTIMESTAMP_DATETIME_FORMAT); + + public QueueMBean(Queue queue, VirtualHostMBean virtualHostMBean) throws JMException + { + super(ManagedQueue.class, ManagedQueue.TYPE, virtualHostMBean.getRegistry()); + _queue = queue; + _vhostMBean = virtualHostMBean; + register(); + _queue.setNotificationListener(this); + } + + public ManagedObject getParentObject() + { + return _vhostMBean; + } + + public String getObjectInstanceName() + { + return ObjectName.quote(getName()); + } + + public String getName() + { + return _queue.getName(); + } + + public Integer getMessageCount() + { + return getStatisticValue(Queue.QUEUE_DEPTH_MESSAGES).intValue(); + } + + public Integer getMaximumDeliveryCount() + { + return (Integer) _queue.getAttribute(Queue.MAXIMUM_DELIVERY_ATTEMPTS); + } + + public Long getReceivedMessageCount() + { + return getStatisticValue(Queue.TOTAL_ENQUEUED_MESSAGES).longValue(); + } + + public Long getQueueDepth() + { + return getStatisticValue(Queue.QUEUE_DEPTH_BYTES).longValue(); + } + + public Integer getActiveConsumerCount() + { + return getStatisticValue(Queue.CONSUMER_COUNT_WITH_CREDIT).intValue(); + } + + public Integer getConsumerCount() + { + return getStatisticValue(Queue.CONSUMER_COUNT).intValue(); + } + + public String getOwner() + { + return (String) _queue.getAttribute(Queue.OWNER); + } + + @Override + public String getQueueType() + { + return (String) _queue.getAttribute(Queue.TYPE); + } + + public boolean isDurable() + { + return _queue.isDurable(); + } + + public boolean isAutoDelete() + { + return _queue.getLifetimePolicy() == LifetimePolicy.AUTO_DELETE; + } + + public Long getMaximumMessageAge() + { + return (Long) _queue.getAttribute(Queue.ALERT_THRESHOLD_MESSAGE_AGE); + } + + public void setMaximumMessageAge(Long age) + { + _queue.setAttribute(Queue.ALERT_THRESHOLD_MESSAGE_AGE, getMaximumMessageAge(), age); + } + + public Long getMaximumMessageSize() + { + return (Long) _queue.getAttribute(Queue.ALERT_THRESHOLD_MESSAGE_SIZE); + } + + public void setMaximumMessageSize(Long size) + { + _queue.setAttribute(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, getMaximumMessageSize(), size); + } + + public Long getMaximumMessageCount() + { + return (Long) _queue.getAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES); + } + + public void setMaximumMessageCount(Long value) + { + _queue.setAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, getMaximumMessageCount(), value); + } + + public Long getMaximumQueueDepth() + { + return (Long) _queue.getAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES); + } + + public void setMaximumQueueDepth(Long value) + { + _queue.setAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, getMaximumQueueDepth(), value); + } + + public Long getCapacity() + { + return (Long) _queue.getAttribute(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES); + } + + public void setCapacity(Long value) + { + _queue.setAttribute(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, getCapacity(), value); + } + + public Long getFlowResumeCapacity() + { + return (Long) _queue.getAttribute(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES); + } + + public void setFlowResumeCapacity(Long value) + { + _queue.setAttribute(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, getFlowResumeCapacity(), value); + } + + public boolean isFlowOverfull() + { + return (Boolean)_queue.getAttribute(Queue.QUEUE_FLOW_STOPPED); + } + + public boolean isExclusive() + { + return (Boolean) _queue.getAttribute(Queue.EXCLUSIVE); + } + + public void setExclusive(boolean exclusive) + { + _queue.setAttribute(Queue.EXCLUSIVE, isExclusive(), exclusive); + } + + public void setAlternateExchange(String exchangeName) throws OperationsException + { + if (exchangeName == null || "".equals(exchangeName)) + { + _queue.setAttribute(Queue.ALTERNATE_EXCHANGE, getAlternateExchange(), null); + } + else + { + VirtualHost virtualHost = _queue.getParent(VirtualHost.class); + Exchange exchange = MBeanUtils.findExchangeFromExchangeName(virtualHost, exchangeName); + + _queue.setAttribute(Queue.ALTERNATE_EXCHANGE, getAlternateExchange(), exchange); + } + } + + public String getAlternateExchange() + { + Exchange alternateExchange = (Exchange) _queue.getAttribute(Queue.ALTERNATE_EXCHANGE); + return alternateExchange == null ? null : alternateExchange.getName(); + } + + public TabularData viewMessages(int fromIndex, int toIndex) + throws IOException, JMException + { + return viewMessages((long)fromIndex, (long)toIndex); + } + + public TabularData viewMessages(long startPosition, long endPosition) + throws IOException, JMException + { + if ((startPosition > endPosition) || (startPosition < 1)) + { + throw new OperationsException("From Index = " + startPosition + ", To Index = " + endPosition + + "\n\"From Index\" should be greater than 0 and less than \"To Index\""); + } + + if ((endPosition - startPosition) > Integer.MAX_VALUE) + { + throw new OperationsException("Specified MessageID interval is too large. Intervals must be less than 2^31 in size"); + } + + + List<QueueEntry> messages = getMessages(startPosition, endPosition); + + TabularDataSupport messageTable = new TabularDataSupport(MSG_LIST_DATA_TYPE); + + + // Create the tabular list of message header contents + long position = startPosition; + + for (QueueEntry queueEntry : messages) + { + ServerMessage serverMsg = queueEntry.getMessage(); + AMQMessageHeader header = serverMsg.getMessageHeader(); + String[] headerAttributes = + {"reply-to = " + header.getReplyTo(), + "propertyFlags = ", + "ApplicationID = " + header.getAppId(), + "ClusterID = ", + "UserId = " + header.getUserId(), + "JMSMessageID = " + header.getMessageId(), + "JMSCorrelationID = " + header.getCorrelationId(), + "JMSDeliveryMode = " + (serverMsg.isPersistent() ? "Persistent" : "Non_Persistent"), + "JMSPriority = " + header.getPriority(), + "JMSType = " + header.getType(), + "JMSExpiration = " + (header.getExpiration() == 0 ? null : FAST_DATE_FORMAT.format(header.getExpiration())), + "JMSTimestamp = " + (header.getTimestamp() == 0 ? null : FAST_DATE_FORMAT.format(header.getTimestamp())) + }; + + Object[] itemValues = new Object[]{ serverMsg.getMessageNumber(), + headerAttributes, + serverMsg.getSize(), + queueEntry.isRedelivered(), + position, + queueEntry.getDeliveryCount()}; + + position++; + + CompositeData messageData = + new CompositeDataSupport(MSG_DATA_TYPE, VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC_ARRAY, itemValues); + messageTable.put(messageData); + } + + return messageTable; + + } + + public CompositeData viewMessageContent(long messageId) + throws IOException, JMException + { + QueueEntry entry = getMessage(messageId); + if(entry == null) + { + throw new OperationsException("AMQMessage with message id = " + messageId + " is not in the " + _queue.getName()); + } + + ServerMessage serverMsg = entry.getMessage(); + final int bodySize = (int) serverMsg.getSize(); + + byte[] msgContent = new byte[bodySize]; + + ByteBuffer buf = ByteBuffer.wrap(msgContent); + int position = 0; + + while(position < bodySize) + { + position += serverMsg.getContent(buf, position); + + } + + AMQMessageHeader header = serverMsg.getMessageHeader(); + + String mimeType = null, encoding = null; + if (header != null) + { + mimeType = header.getMimeType(); + + encoding = header.getEncoding(); + } + + + Object[] itemValues = { messageId, mimeType, encoding, msgContent }; + + return new CompositeDataSupport(MSG_CONTENT_TYPE, VIEW_MSG_COMPOSIT_ITEM_NAMES_ARRAY, itemValues); + + + } + + private QueueEntry getMessage(long messageId) + { + GetMessageVisitor visitor = new GetMessageVisitor(messageId); + _queue.visit(visitor); + return visitor.getEntry(); + } + + public void deleteMessageFromTop() throws IOException, JMException + { + VirtualHost vhost = _queue.getParent(VirtualHost.class); + vhost.executeTransaction(new VirtualHost.TransactionalOperation() + { + public void withinTransaction(final VirtualHost.Transaction txn) + { + _queue.visit(new QueueEntryVisitor() + { + + public boolean visit(final QueueEntry entry) + { + if(entry.acquire()) + { + txn.dequeue(entry); + return true; + } + return false; + } + }); + + } + }); + + } + + public Long clearQueue() throws IOException, JMException + { + VirtualHost vhost = _queue.getParent(VirtualHost.class); + final AtomicLong count = new AtomicLong(); + + vhost.executeTransaction(new VirtualHost.TransactionalOperation() + { + public void withinTransaction(final VirtualHost.Transaction txn) + { + _queue.visit(new QueueEntryVisitor() + { + + public boolean visit(final QueueEntry entry) + { + final ServerMessage message = entry.getMessage(); + if(message != null) + { + txn.dequeue(entry); + count.incrementAndGet(); + + } + return false; + } + }); + + } + }); + return count.get(); + } + + public void moveMessages(final long fromMessageId, final long toMessageId, String toQueue) + throws IOException, JMException + { + if ((fromMessageId > toMessageId) || (fromMessageId < 1)) + { + throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\""); + } + + VirtualHost vhost = _queue.getParent(VirtualHost.class); + final Queue destinationQueue = MBeanUtils.findQueueFromQueueName(vhost, toQueue); + + vhost.executeTransaction(new VirtualHost.TransactionalOperation() + { + public void withinTransaction(final VirtualHost.Transaction txn) + { + _queue.visit(new QueueEntryVisitor() + { + + public boolean visit(final QueueEntry entry) + { + final ServerMessage message = entry.getMessage(); + if(message != null) + { + final long messageId = message.getMessageNumber(); + + if ((messageId >= fromMessageId) + && (messageId <= toMessageId)) + { + txn.move(entry, destinationQueue); + } + + } + return false; + } + }); + } + }); + } + + public void deleteMessages(final long fromMessageId, final long toMessageId) + throws IOException, JMException + { + VirtualHost vhost = _queue.getParent(VirtualHost.class); + vhost.executeTransaction(new VirtualHost.TransactionalOperation() + { + public void withinTransaction(final VirtualHost.Transaction txn) + { + _queue.visit(new QueueEntryVisitor() + { + + public boolean visit(final QueueEntry entry) + { + final ServerMessage message = entry.getMessage(); + if(message != null) + { + final long messageId = message.getMessageNumber(); + + if ((messageId >= fromMessageId) + && (messageId <= toMessageId)) + { + txn.dequeue(entry); + return true; + } + return false; + } + return true; + } + }); + } + }); + } + + public void copyMessages(final long fromMessageId, final long toMessageId, String toQueue) + throws IOException, JMException + { + if ((fromMessageId > toMessageId) || (fromMessageId < 1)) + { + throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\""); + } + + VirtualHost vhost = _queue.getParent(VirtualHost.class); + final Queue destinationQueue = MBeanUtils.findQueueFromQueueName(vhost, toQueue); + + vhost.executeTransaction(new VirtualHost.TransactionalOperation() + { + public void withinTransaction(final VirtualHost.Transaction txn) + { + _queue.visit(new QueueEntryVisitor() + { + + public boolean visit(final QueueEntry entry) + { + final ServerMessage message = entry.getMessage(); + if(message != null) + { + final long messageId = message.getMessageNumber(); + + if ((messageId >= fromMessageId) + && (messageId <= toMessageId)) + { + txn.copy(entry, destinationQueue); + } + + } + return false; + } + }); + } + }); + } + + private List<QueueEntry> getMessages(final long first, final long last) + { + final List<QueueEntry> messages = new ArrayList<QueueEntry>((int)(last-first)+1); + _queue.visit(new QueueEntryVisitor() + { + private long position = 1; + + public boolean visit(QueueEntry entry) + { + if(position >= first && position <= last) + { + messages.add(entry); + } + position++; + return position > last; + } + }); + return messages; + } + + + private static class GetMessageVisitor implements QueueEntryVisitor + { + + private final long _messageNumber; + private QueueEntry _entry; + + public GetMessageVisitor(long messageId) + { + _messageNumber = messageId; + } + + public boolean visit(QueueEntry entry) + { + if(entry.getMessage().getMessageNumber() == _messageNumber) + { + _entry = entry; + return true; + } + return false; + } + + public QueueEntry getEntry() + { + return _entry; + } + } + + @Override + public void notifyClients(NotificationCheck notification, Queue queue, String notificationMsg) + { + notificationMsg = notification.name() + " " + notificationMsg; + + Notification note = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, + incrementAndGetSequenceNumber(), System.currentTimeMillis(), notificationMsg); + + getBroadcaster().sendNotification(note); + } + + /** + * returns Notifications sent by this MBean. + */ + @Override + public MBeanNotificationInfo[] getNotificationInfo() + { + String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED }; + String name = MonitorNotification.class.getName(); + String description = "Either Message count or Queue depth or Message size has reached threshold high value"; + MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description); + + return new MBeanNotificationInfo[] { info1 }; + } + + @Override + public String getDescription() + { + return (String) _queue.getAttribute(Queue.DESCRIPTION); + } + + @Override + public void setDescription(String description) + { + _queue.setAttribute(Queue.DESCRIPTION, getDescription(), description); + } + + private Number getStatisticValue(String name) + { + final Number statistic = (Number) _queue.getStatistics().getStatistic(name); + return statistic == null ? Integer.valueOf(0) : statistic; + } +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ServerInformationMBean.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ServerInformationMBean.java new file mode 100644 index 0000000000..597b98ccaa --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ServerInformationMBean.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.jmx.mbeans; + +import java.io.IOException; + +import javax.management.JMException; +import javax.management.NotCompliantMBeanException; + +import org.apache.qpid.common.QpidProperties; +import org.apache.qpid.management.common.mbeans.ServerInformation; +import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; +import org.apache.qpid.server.jmx.ManagedObject; +import org.apache.qpid.server.jmx.ManagedObjectRegistry; +import org.apache.qpid.server.model.Broker; + +@MBeanDescription("Server Information Interface") +public class ServerInformationMBean extends AbstractStatisticsGatheringMBean<Broker> implements ServerInformation +{ + private String _buildVersion; + private String _productVersion; + + public ServerInformationMBean(ManagedObjectRegistry registry, Broker broker) + throws NotCompliantMBeanException, JMException + { + super(ServerInformation.class, ServerInformation.TYPE, registry, broker); + + _buildVersion = QpidProperties.getBuildVersion(); + _productVersion = QpidProperties.getReleaseVersion(); + + register(); + } + + @Override + public String getObjectInstanceName() + { + return ServerInformation.TYPE; + } + + @Override + public Integer getManagementApiMajorVersion() throws IOException + { + return QPID_JMX_API_MAJOR_VERSION; + } + + @Override + public Integer getManagementApiMinorVersion() throws IOException + { + return QPID_JMX_API_MINOR_VERSION; + } + + @Override + public String getBuildVersion() throws IOException + { + return _buildVersion; + } + + @Override + public String getProductVersion() throws IOException + { + return _productVersion; + } + + @Override + public boolean isStatisticsEnabled() + { + return false; + } + + @Override + public ManagedObject getParentObject() + { + // does not have a parent + return null; + } +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/Shutdown.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/Shutdown.java new file mode 100644 index 0000000000..62733168ef --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/Shutdown.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.jmx.mbeans; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.jmx.DefaultManagedObject; +import org.apache.qpid.server.jmx.ManagedObject; +import org.apache.qpid.server.jmx.ManagedObjectRegistry; + +import javax.management.JMException; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Implementation of the JMX broker shutdown plugin. + */ +public class Shutdown extends DefaultManagedObject implements ShutdownMBean +{ + + private static final Logger _logger = Logger.getLogger(Shutdown.class); + + private static final String FORMAT = "yyyy/MM/dd HH:mm:ss"; + private static final int THREAD_COUNT = 1; + private static final ScheduledExecutorService EXECUTOR = new ScheduledThreadPoolExecutor(THREAD_COUNT); + + private final Runnable _shutdown = new SystemExiter(); + + public Shutdown(ManagedObjectRegistry registry) throws JMException + { + super(ShutdownMBean.class, ShutdownMBean.TYPE, registry); + register(); + } + + /** @see ShutdownMBean#shutdown() */ + public void shutdown() + { + _logger.info("Shutting down at user's request"); + shutdownBroker(0); + } + + /** @see ShutdownMBean#shutdown(long) */ + public void shutdown(final long delay) + { + if (delay < 0) + { + _logger.info("Shutting down at user's request"); + shutdownBroker(0); + } + else + { + _logger.info("Scheduled broker shutdown after " + delay + "ms"); + shutdownBroker(delay); + } + } + + /** @see ShutdownMBean#shutdownAt(String) */ + public void shutdownAt(final String when) + { + Date date; + DateFormat df = new SimpleDateFormat(FORMAT); + try + { + date = df.parse(when); + } + catch (ParseException e) + { + _logger.error("Invalid date \"" + when + "\": expecting " + FORMAT, e); + return; + } + _logger.info("Scheduled broker shutdown at " + when); + long now = System.currentTimeMillis(); + long time = date.getTime(); + if (time > now) + { + shutdownBroker(time - now); + } + else + { + shutdownBroker(0); + } + } + + /** + * Submits the {@link SystemExiter} job to shutdown the broker. + */ + private void shutdownBroker(long delay) + { + EXECUTOR.schedule(_shutdown, delay, TimeUnit.MILLISECONDS); + } + + @Override + public ManagedObject getParentObject() + { + return null; + } + + /** + * Shutting down the system in another thread to avoid JMX exceptions being thrown. + */ + class SystemExiter implements Runnable + { + public void run() + { + System.exit(0); + } + } + + /** + * @see ManagedObject#getObjectInstanceName() + */ + public String getObjectInstanceName() + { + return "Shutdown"; + } +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ShutdownMBean.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ShutdownMBean.java new file mode 100644 index 0000000000..ed69c351f7 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ShutdownMBean.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.jmx.mbeans; + +import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation; +import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParameter; + +import javax.management.MBeanOperationInfo; + +/** + * Shutdown plugin JMX MBean interface. + * + * Shuts the Qpid broker down via JMX. + */ +public interface ShutdownMBean +{ + static final String TYPE = "Shutdown"; + + /** + * Broker will be shut down immediately. + */ + @MBeanOperation(name="shutdown", description="Shut down immediately", impact = MBeanOperationInfo.ACTION) + public void shutdown(); + + /** + * Broker will be shutdown after the specified delay + * + * @param delay the number of ms to wait + */ + @MBeanOperation(name="shutdown", description="Shutdown after the specified delay (ms)", impact = MBeanOperationInfo.ACTION) + public void shutdown(@MBeanOperationParameter(name = "when", description = "delay (ms)") long delay); + + /** + * Broker will be shutdown at the specified date and time. + * + * @param when the date and time to shutdown + */ + @MBeanOperation(name="shutdownAt", description="Shutdown at the specified date and time (yyyy/MM/dd HH:mm:ss)", impact = MBeanOperationInfo.ACTION) + public void shutdownAt(@MBeanOperationParameter(name = "when", + description = "shutdown date/time (yyyy/MM/dd HH:mm:ss)") String when); +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/UserManagementMBean.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/UserManagementMBean.java new file mode 100644 index 0000000000..c7aade34b4 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/UserManagementMBean.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.jmx.mbeans; + +import org.apache.log4j.Logger; + +import org.apache.qpid.management.common.mbeans.UserManagement; +import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; +import org.apache.qpid.server.jmx.AMQManagedObject; +import org.apache.qpid.server.jmx.ManagedObject; +import org.apache.qpid.server.jmx.ManagedObjectRegistry; +import org.apache.qpid.server.model.PasswordCredentialManagingAuthenticationProvider; + +import javax.management.JMException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; +import javax.security.auth.login.AccountNotFoundException; + +import java.io.IOException; +import java.util.Map; + +@MBeanDescription("User Management Interface") +public class UserManagementMBean extends AMQManagedObject implements UserManagement +{ + private static final Logger _logger = Logger.getLogger(UserManagementMBean.class); + + private PasswordCredentialManagingAuthenticationProvider _authProvider; + + // Setup for the TabularType + private static final TabularType _userlistDataType; // Datatype for representing User Lists + private static final CompositeType _userDataType; // Composite type for representing User + + static + { + OpenType[] userItemTypes = new OpenType[4]; // User item types. + userItemTypes[0] = SimpleType.STRING; // For Username + userItemTypes[1] = SimpleType.BOOLEAN; // For Rights - Read - No longer in use + userItemTypes[2] = SimpleType.BOOLEAN; // For Rights - Write - No longer in use + userItemTypes[3] = SimpleType.BOOLEAN; // For Rights - Admin - No longer is use + + try + { + _userDataType = + new CompositeType("User", "User Data", COMPOSITE_ITEM_NAMES.toArray(new String[COMPOSITE_ITEM_NAMES.size()]), + COMPOSITE_ITEM_DESCRIPTIONS.toArray(new String[COMPOSITE_ITEM_DESCRIPTIONS.size()]), userItemTypes); + + _userlistDataType = new TabularType("Users", "List of users", _userDataType, TABULAR_UNIQUE_INDEX.toArray(new String[TABULAR_UNIQUE_INDEX.size()])); + } + catch (OpenDataException e) + { + _logger.error("Tabular data setup for viewing users incorrect.", e); + throw new ExceptionInInitializerError("Tabular data setup for viewing users incorrect"); + } + } + + public UserManagementMBean(PasswordCredentialManagingAuthenticationProvider provider, ManagedObjectRegistry registry) throws JMException + { + super(UserManagement.class, UserManagement.TYPE, registry); + register(); + _authProvider = provider; + } + + @Override + public String getObjectInstanceName() + { + return UserManagement.TYPE; + } + + @Override + public boolean setPassword(String username, String password) + { + try + { + _authProvider.setPassword(username, password); + } + catch (AccountNotFoundException e) + { + _logger.warn("Attempt to set password of non-existent user '" + username + "'"); + return false; + } + return true; + } + + @Override + public boolean createUser(String username, String password) + { + return _authProvider.createUser(username, password, null); + } + + @Override + public boolean deleteUser(String username) + { + try + { + _authProvider.deleteUser(username); + } + catch (AccountNotFoundException e) + { + _logger.warn("Attempt to delete user (" + username + ") that doesn't exist"); + return false; + } + + return true; + } + + @Override + public boolean reloadData() + { + try + { + _authProvider.reload(); + return true; + } + catch (IOException e) + { + _logger.error("Unable to reload user data", e); + return false; + } + } + + @Override + public TabularData viewUsers() + { + Map<String, Map<String, String>> users = _authProvider.getUsers(); + + TabularDataSupport userList = new TabularDataSupport(_userlistDataType); + + try + { + // Create the tabular list of message header contents + for (String user : users.keySet()) + { + // Create header attributes list + // Read,Write,Admin items are deprecated and we return always false. + Object[] itemData = {user, false, false, false}; + CompositeData messageData = new CompositeDataSupport(_userDataType, COMPOSITE_ITEM_NAMES.toArray(new String[COMPOSITE_ITEM_NAMES.size()]), itemData); + userList.put(messageData); + } + } + catch (OpenDataException e) + { + _logger.warn("Unable to create user list due to :", e); + return null; + } + + return userList; + } + + @Override + public ManagedObject getParentObject() + { + return null; + } +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostMBean.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostMBean.java new file mode 100644 index 0000000000..85f53d9c0d --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostMBean.java @@ -0,0 +1,208 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.jmx.mbeans; + +import org.apache.qpid.server.jmx.AMQManagedObject; +import org.apache.qpid.server.jmx.ManagedObject; +import org.apache.qpid.server.jmx.ManagedObjectRegistry; +import org.apache.qpid.server.model.ConfigurationChangeListener; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Connection; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.virtualhost.ManagedVirtualHost; + +import javax.management.JMException; +import javax.management.ObjectName; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost, ConfigurationChangeListener +{ + private final VirtualHost _virtualHost; + + private final Map<ConfiguredObject, AMQManagedObject> _children = + new HashMap<ConfiguredObject, AMQManagedObject>(); + private VirtualHostManagerMBean _managerMBean; + + public VirtualHostMBean(VirtualHost virtualHost, ManagedObjectRegistry registry) throws JMException + { + super(ManagedVirtualHost.class, ManagedVirtualHost.TYPE, registry); + _virtualHost = virtualHost; + virtualHost.addChangeListener(this); + + initQueues(); + initExchanges(); + initConnections(); + + //This is the actual JMX bean for this 'VirtualHostMBean', leave it alone. + _managerMBean = new VirtualHostManagerMBean(this); + } + + private void initQueues() throws JMException + { + synchronized (_children) + { + for(Queue queue : _virtualHost.getQueues()) + { + if(!_children.containsKey(queue)) + { + _children.put(queue, new QueueMBean(queue, this)); + } + } + } + } + + private void initExchanges() throws JMException + { + synchronized (_children) + { + for(Exchange exchange : _virtualHost.getExchanges()) + { + if(!_children.containsKey(exchange)) + { + _children.put(exchange, new ExchangeMBean(exchange, this)); + } + } + } + } + + private void initConnections() throws JMException + { + synchronized (_children) + { + for(Connection conn : _virtualHost.getConnections()) + { + if(!_children.containsKey(conn)) + { + _children.put(conn, new ConnectionMBean(conn, this)); + } + } + } + } + + public String getObjectInstanceName() + { + return ObjectName.quote(_virtualHost.getName()); + } + + public String getName() + { + return _virtualHost.getName(); + } + + public void stateChanged(ConfiguredObject object, State oldState, State newState) + { + // ignore + } + + public void childAdded(ConfiguredObject object, ConfiguredObject child) + { + synchronized (_children) + { + try + { + if(child instanceof Queue) + { + QueueMBean queueMB = new QueueMBean((Queue)child, this); + _children.put(child, queueMB); + + } + else if(child instanceof Exchange) + { + ExchangeMBean exchangeMBean = new ExchangeMBean((Exchange)child, this); + _children.put(child, exchangeMBean); + + } + else if(child instanceof Connection) + { + ConnectionMBean connectionMBean = new ConnectionMBean((Connection)child, this); + _children.put(child, connectionMBean); + + } + else + { + // TODO + } + + } + catch(JMException e) + { + e.printStackTrace(); //TODO - report error on adding child MBean + } + } + } + + public void childRemoved(ConfiguredObject object, ConfiguredObject child) + { + synchronized (_children) + { + AMQManagedObject mbean = _children.remove(child); + if(mbean != null) + { + try + { + mbean.unregister(); + } + catch(JMException e) + { + e.printStackTrace(); //TODO - report error on removing child MBean + } + } + } + } + + @Override + public ManagedObject getParentObject() + { + return null; + } + + protected VirtualHost getVirtualHost() + { + return _virtualHost; + } + + public Collection<QueueMBean> getQueues() + { + Collection<AMQManagedObject> children; + synchronized (_children) + { + children = new ArrayList<AMQManagedObject>(_children.values()); + } + Collection<QueueMBean> queues = new ArrayList<QueueMBean>(); + + for(AMQManagedObject child : children) + { + if(child instanceof QueueMBean) + { + queues.add((QueueMBean) child); + } + } + + return queues; + } +} diff --git a/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java new file mode 100644 index 0000000000..9d12d8a493 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java @@ -0,0 +1,240 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.jmx.mbeans; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.management.JMException; +import javax.management.MBeanException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.OperationsException; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQInvalidArgumentException; +import org.apache.qpid.AMQUnknownExchangeType; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; +import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; +import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParameter; +import org.apache.qpid.server.jmx.ManagedObject; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.queue.AMQQueueFactory; + +@MBeanDescription("This MBean exposes the broker level management features") +public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<VirtualHost> implements ManagedBroker +{ + private static final Logger LOGGER = Logger.getLogger(VirtualHostManagerMBean.class); + + private static final boolean _moveNonExclusiveQueueOwnerToDescription = Boolean.parseBoolean(System.getProperty("qpid.move_non_exclusive_queue_owner_to_description", Boolean.TRUE.toString())); + + private final VirtualHostMBean _virtualHostMBean; + + @MBeanConstructor("Creates the Broker Manager MBean") + public VirtualHostManagerMBean(VirtualHostMBean virtualHostMBean) throws JMException + { + super(ManagedBroker.class, ManagedBroker.TYPE, virtualHostMBean.getRegistry(), virtualHostMBean.getVirtualHost()); + _virtualHostMBean = virtualHostMBean; + register(); + } + + + @Override + public String getObjectInstanceName() + { + return ObjectName.quote(_virtualHostMBean.getName()); + } + + @Override + public ManagedObject getParentObject() + { + return _virtualHostMBean; + } + + @Override + public String[] getExchangeTypes() throws IOException + { + Collection<String> exchangeTypes = _virtualHostMBean.getVirtualHost().getExchangeTypes(); + return exchangeTypes.toArray(new String[exchangeTypes.size()]); + } + + @Override + public List<String> retrieveQueueAttributeNames() throws IOException + { + return ManagedQueue.QUEUE_ATTRIBUTES; + } + + @Override + public List<List<Object>> retrieveQueueAttributeValues( + @MBeanOperationParameter(name = "attributes", description = "Attributes to retrieve") String[] attributes) + throws IOException + { + int attributesLength = attributes.length; + + List<List<Object>> queueAttributesList = new ArrayList<List<Object>>(); + + for(QueueMBean queue : _virtualHostMBean.getQueues()) + { + + if(queue == null) + { + continue; + } + + List<Object> attributeValues = new ArrayList<Object>(attributesLength); + + for(int i=0; i < attributesLength; i++) + { + try + { + attributeValues.add(queue.getAttribute(attributes[i])); + } + catch (Exception e) + { + attributeValues.add("-"); + } + } + + queueAttributesList.add(attributeValues); + } + + return queueAttributesList; + + } + + @Override + public void createNewExchange(String name, String type, boolean durable) + throws IOException, JMException, MBeanException + { + if (!getConfiguredObject().getExchangeTypes().contains(type)) + { + throw new OperationsException("No such exchange type \""+type+"\""); + } + + try + { + getConfiguredObject().createExchange(name, State.ACTIVE, durable, + LifetimePolicy.PERMANENT, 0l, type, Collections.EMPTY_MAP); + } + catch (IllegalArgumentException iae) + { + JMException jme = new JMException(iae.toString()); + throw new MBeanException(jme, "Error in creating exchange " + name); + } + + } + + @Override + public void unregisterExchange(String exchangeName) + throws IOException, JMException, MBeanException + { + Exchange theExchange = MBeanUtils.findExchangeFromExchangeName(_virtualHostMBean.getVirtualHost(), exchangeName); + try + { + theExchange.delete(); + } + catch (IllegalStateException ex) + { + final JMException jme = new JMException(ex.toString()); + throw new MBeanException(jme, "Error in unregistering exchange " + exchangeName); + } + } + + @Override + public void createNewQueue(String queueName, String owner, boolean durable) + throws IOException, JMException, MBeanException + { + createNewQueue(queueName, owner, durable, Collections.EMPTY_MAP); + } + + @Override + public void createNewQueue(String queueName, String owner, boolean durable, Map<String, Object> originalArguments) + throws IOException, JMException + { + final Map<String, Object> createArgs = processNewQueueArguments(queueName, owner, originalArguments); + getConfiguredObject().createQueue(queueName, State.ACTIVE, durable, false, LifetimePolicy.PERMANENT, 0l, createArgs); + } + + + /** + * Some users have been abusing the owner field to store a queue description. As the owner field + * only makes sense if exclusive=true, and it is currently impossible to create an exclusive queue via + * the JMX interface, if the user specifies a owner, then we assume that they actually mean to pass a description. + */ + private Map<String, Object> processNewQueueArguments(String queueName, + String owner, Map<String, Object> arguments) + { + final Map<String, Object> argumentsCopy; + if (_moveNonExclusiveQueueOwnerToDescription && owner != null) + { + argumentsCopy = new HashMap<String, Object>(arguments == null ? new HashMap<String, Object>() : arguments); + if (!argumentsCopy.containsKey(AMQQueueFactory.X_QPID_DESCRIPTION)) + { + LOGGER.warn("Non-exclusive owner " + owner + " for new queue " + queueName + " moved to " + AMQQueueFactory.X_QPID_DESCRIPTION); + + argumentsCopy.put(AMQQueueFactory.X_QPID_DESCRIPTION, owner); + } + else + { + LOGGER.warn("Non-exclusive owner " + owner + " for new queue " + queueName + " ignored."); + } + } + else + { + argumentsCopy = arguments; + } + return argumentsCopy; + } + + @Override + public void deleteQueue( + @MBeanOperationParameter(name = ManagedQueue.TYPE, description = "Queue Name") String queueName) + throws IOException, JMException, MBeanException + { + Queue theQueue = MBeanUtils.findQueueFromQueueName(_virtualHostMBean.getVirtualHost(), queueName); + theQueue.delete(); + } + + @Override + public ObjectName getObjectName() throws MalformedObjectNameException + { + return getObjectNameForSingleInstanceMBean(); + } + + public synchronized boolean isStatisticsEnabled() + { + updateStats(); + return false; //TODO - implement isStatisticsEnabled + } + +} diff --git a/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/NoopManagedObjectRegistry.java b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/NoopManagedObjectRegistry.java new file mode 100644 index 0000000000..a2631bab7f --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/NoopManagedObjectRegistry.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.jmx; + +import javax.management.JMException; + +public class NoopManagedObjectRegistry implements ManagedObjectRegistry +{ + public NoopManagedObjectRegistry() + { + } + + public void start() + { + } + + public void registerObject(ManagedObject managedObject) throws JMException + { + } + + public void unregisterObject(ManagedObject managedObject) throws JMException + { + } + + public void close() + { + } +} diff --git a/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBeanTest.java b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBeanTest.java new file mode 100644 index 0000000000..f97c5a7210 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBeanTest.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.jmx.mbeans; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.Date; + +import javax.management.JMException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + +import junit.framework.TestCase; + +import org.apache.qpid.management.common.mbeans.ManagedConnection; +import org.apache.qpid.server.jmx.ManagedObject; +import org.apache.qpid.server.jmx.ManagedObjectRegistry; +import org.apache.qpid.server.model.Connection; +import org.apache.qpid.server.model.Session; +import org.apache.qpid.server.model.Statistics; + +public class ConnectionMBeanTest extends TestCase +{ + private ConnectionMBean _connectionMBean; + private Connection _mockConnection; + private VirtualHostMBean _mockVirtualHostMBean; + private ManagedObjectRegistry _mockManagedObjectRegistry; + + @Override + protected void setUp() throws Exception + { + _mockConnection = mock(Connection.class); + _mockVirtualHostMBean = mock(VirtualHostMBean.class); + + _mockManagedObjectRegistry = mock(ManagedObjectRegistry.class); + when(_mockVirtualHostMBean.getRegistry()).thenReturn(_mockManagedObjectRegistry); + + _connectionMBean = new ConnectionMBean(_mockConnection, _mockVirtualHostMBean); + } + + public void testMBeanRegistersItself() throws Exception + { + ConnectionMBean connectionMBean = new ConnectionMBean(_mockConnection, _mockVirtualHostMBean); + verify(_mockManagedObjectRegistry).registerObject(connectionMBean); + } + + public void testCloseConnection() throws Exception + { + _connectionMBean.closeConnection(); + verify(_mockConnection).delete(); + } + + public void testCommitTransactions() + { + try + { + _connectionMBean.commitTransactions(0); + fail("Exception not thrown"); + } + catch(JMException e) + { + assertTrue("Cause should be an UnsupportedOperationException", e.getCause() instanceof UnsupportedOperationException); + } + } + + public void testRollbackTransactions() + { + try + { + _connectionMBean.rollbackTransactions(0); + fail("Exception not thrown"); + } + catch(JMException e) + { + assertTrue("Cause should be an UnsupportedOperationException", e.getCause() instanceof UnsupportedOperationException); + } + } + + public void testChannelsWithSingleTransactionalSession() throws Exception + { + int channelId = 10; + int unacknowledgedMessages = 2; + long localTransactionBegins = 1; + boolean transactional = true; + boolean blocked = false; + + Session mockSession = createMockedSession(channelId, unacknowledgedMessages, localTransactionBegins, blocked); + + when(_mockConnection.getSessions()).thenReturn(Collections.singletonList(mockSession)); + + TabularData table = _connectionMBean.channels(); + assertEquals("Unexpected number of rows in table", 1, table.size()); + + final CompositeData row = table.get(new Integer[] {channelId} ); + assertChannelRow(row, channelId, unacknowledgedMessages, transactional, blocked); + } + + public void testChannelsWithSingleNonTransactionalSession() throws Exception + { + int channelId = 10; + int unacknowledgedMessages = 2; + long localTransactionBegins = 0; + boolean transactional = false; + boolean blocked = false; + + Session mockSession = createMockedSession(channelId, unacknowledgedMessages, localTransactionBegins, blocked); + + when(_mockConnection.getSessions()).thenReturn(Collections.singletonList(mockSession)); + + TabularData table = _connectionMBean.channels(); + assertEquals("Unexpected number of rows in table", 1, table.size()); + + final CompositeData row = table.get(new Integer[] {channelId} ); + assertChannelRow(row, channelId, unacknowledgedMessages, transactional, blocked); + } + + public void testChannelsWithSessionBlocked() throws Exception + { + int channelId = 10; + int unacknowledgedMessages = 2; + long localTransactionBegins = 0; + boolean transactional = false; + boolean blocked = true; + + Session mockSession = createMockedSession(channelId, unacknowledgedMessages, localTransactionBegins, blocked); + + when(_mockConnection.getSessions()).thenReturn(Collections.singletonList(mockSession)); + + TabularData table = _connectionMBean.channels(); + assertEquals("Unexpected number of rows in table", 1, table.size()); + + final CompositeData row = table.get(new Integer[] {channelId} ); + assertChannelRow(row, channelId, unacknowledgedMessages, transactional, blocked); + } + + public void testParentObjectIsVirtualHost() + { + ManagedObject parent = _connectionMBean.getParentObject(); + assertEquals(_mockVirtualHostMBean, parent); + } + + public void testGetObjectInstanceName() + { + String remoteAddress = "testRemoteAddress"; + String quotedRemoteAddress = "\"testRemoteAddress\""; + when(_mockConnection.getAttribute(Connection.REMOTE_ADDRESS)).thenReturn(remoteAddress); + String objectInstanceName = _connectionMBean.getObjectInstanceName(); + assertEquals(quotedRemoteAddress, objectInstanceName); + } + + public void testGetAuthorizedId() throws Exception + { + assertAttribute("authorizedId", "testAuthorizedId", Connection.PRINCIPAL); + } + + public void testGetVersion() throws Exception + { + assertAttribute("version", "testVersion", Connection.CLIENT_VERSION); + } + + public void testGetRemoteAddress() throws Exception + { + assertAttribute("remoteAddress", "testRemoteAddress", Connection.REMOTE_ADDRESS); + } + + public void testGetLastIoTime() + { + Statistics mockStatistics = mock(Statistics.class); + when(_mockConnection.getStatistics()).thenReturn(mockStatistics); + when(mockStatistics.getStatistic(Connection.LAST_IO_TIME)).thenReturn(1L); + + Object actualValue = _connectionMBean.getLastIoTime(); + assertEquals("Unexpected lastIoTime", new Date(1L), actualValue); + } + + public void testGetMaximumNumberOfChannels() throws Exception + { + assertAttribute("maximumNumberOfChannels", 10l, Connection.SESSION_COUNT_LIMIT); + } + + public void testIsStatisticsEnabledAlwaysTrue() throws Exception + { + assertTrue(_connectionMBean.isStatisticsEnabled()); + } + + private void assertAttribute(String jmxAttributeName, Object expectedValue, String underlyingAttributeName) throws Exception + { + when(_mockConnection.getAttribute(underlyingAttributeName)).thenReturn(expectedValue); + MBeanTestUtils.assertMBeanAttribute(_connectionMBean, jmxAttributeName, expectedValue); + } + + private void assertChannelRow(final CompositeData row, int channelId, int unacknowledgedMessages, boolean isTransactional, boolean flowBlocked) + { + assertNotNull("No row for channel id " + channelId, row); + assertEquals("Unexpected channel id", channelId, row.get(ManagedConnection.CHAN_ID)); + assertEquals("Unexpected transactional flag", isTransactional, row.get(ManagedConnection.TRANSACTIONAL)); + assertEquals("Unexpected unacknowledged message count", unacknowledgedMessages, row.get(ManagedConnection.UNACKED_COUNT)); + assertEquals("Unexpected flow blocked", flowBlocked, row.get(ManagedConnection.FLOW_BLOCKED)); + } + + private Session createMockedSession(int channelId, int unacknowledgedMessages, long localTransactionBegins, boolean blocked) + { + Session mockSession = mock(Session.class); + Statistics mockSessionStatistics = mock(Statistics.class); + when(mockSessionStatistics.getStatistic(Session.LOCAL_TRANSACTION_BEGINS)).thenReturn(localTransactionBegins); + when(mockSessionStatistics.getStatistic(Session.UNACKNOWLEDGED_MESSAGES)).thenReturn(unacknowledgedMessages); + + when(mockSession.getStatistics()).thenReturn(mockSessionStatistics); + when(mockSession.getAttribute(Session.CHANNEL_ID)).thenReturn(channelId); + when(mockSession.getAttribute(Session.PRODUCER_FLOW_BLOCKED)).thenReturn(blocked); + return mockSession; + } +} diff --git a/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/LoggingManagementMBeanTest.java b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/LoggingManagementMBeanTest.java new file mode 100644 index 0000000000..64b942c9a9 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/LoggingManagementMBeanTest.java @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.jmx.mbeans; + +import static org.mockito.Mockito.*; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import org.apache.qpid.server.jmx.ManagedObjectRegistry; +import org.apache.qpid.server.util.InternalBrokerBaseCase; + +import static org.apache.qpid.management.common.mbeans.LoggingManagement.LOGGER_LEVEL; +import static org.apache.qpid.management.common.mbeans.LoggingManagement.LOGGER_NAME; + +import javax.management.JMException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularDataSupport; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class LoggingManagementMBeanTest extends InternalBrokerBaseCase +{ + + private static final String TEST_LOGGER = "LoggingManagementMBeanTestLogger"; + private static final String TEST_LOGGER_CHILD1 = "LoggingManagementMBeanTestLogger.child1"; + private static final String TEST_LOGGER_CHILD2 = "LoggingManagementMBeanTestLogger.child2"; + + private static final String TEST_CATEGORY_PRIORITY = "LogManMBeanTest.category.priority"; + private static final String TEST_CATEGORY_LEVEL = "LogManMBeanTest.category.level"; + private static final String TEST_LOGGER_LEVEL = "LogManMBeanTest.logger.level"; + + private static final String NEWLINE = System.getProperty("line.separator"); + + private File _testConfigFile; + + private ManagedObjectRegistry _registry = mock(ManagedObjectRegistry.class); + + @Override + public void setUp() throws Exception + { + super.setUp(); + _testConfigFile = createTempTestLog4JConfig(); + } + + @Override + public void tearDown() throws Exception + { + File oldTestConfigFile = new File(_testConfigFile.getAbsolutePath() + ".old"); + if(oldTestConfigFile.exists()) + { + oldTestConfigFile.delete(); + } + + _testConfigFile.delete(); + + super.tearDown(); + } + + private File createTempTestLog4JConfig() + { + File tmpFile = null; + try + { + tmpFile = File.createTempFile("LogManMBeanTestLog4jConfig", ".tmp"); + tmpFile.deleteOnExit(); + + FileWriter fstream = new FileWriter(tmpFile); + BufferedWriter writer = new BufferedWriter(fstream); + + writer.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"+NEWLINE); + writer.write("<!DOCTYPE log4j:configuration SYSTEM \"log4j.dtd\">"+NEWLINE); + + writer.write("<log4j:configuration xmlns:log4j=\"http://jakarta.apache.org/log4j/\" debug=\"null\" " + + "threshold=\"null\">"+NEWLINE); + + writer.write(" <appender class=\"org.apache.log4j.ConsoleAppender\" name=\"STDOUT\">"+NEWLINE); + writer.write(" <layout class=\"org.apache.log4j.PatternLayout\">"+NEWLINE); + writer.write(" <param name=\"ConversionPattern\" value=\"%d %-5p [%t] %C{2} (%F:%L) - %m%n\"/>"+NEWLINE); + writer.write(" </layout>"+NEWLINE); + writer.write(" </appender>"+NEWLINE); + + //Example of a 'category' with a 'priority' + writer.write(" <category additivity=\"true\" name=\"" + TEST_CATEGORY_PRIORITY +"\">"+NEWLINE); + writer.write(" <priority value=\"info\"/>"+NEWLINE); + writer.write(" <appender-ref ref=\"STDOUT\"/>"+NEWLINE); + writer.write(" </category>"+NEWLINE); + + //Example of a 'category' with a 'level' + writer.write(" <category additivity=\"true\" name=\"" + TEST_CATEGORY_LEVEL +"\">"+NEWLINE); + writer.write(" <level value=\"warn\"/>"+NEWLINE); + writer.write(" <appender-ref ref=\"STDOUT\"/>"+NEWLINE); + writer.write(" </category>"+NEWLINE); + + //Example of a 'logger' with a 'level' + writer.write(" <logger additivity=\"true\" name=\"" + TEST_LOGGER_LEVEL + "\">"+NEWLINE); + writer.write(" <level value=\"error\"/>"+NEWLINE); + writer.write(" <appender-ref ref=\"STDOUT\"/>"+NEWLINE); + writer.write(" </logger>"+NEWLINE); + + //'root' logger + writer.write(" <root>"+NEWLINE); + writer.write(" <priority value=\"info\"/>"+NEWLINE); + writer.write(" <appender-ref ref=\"STDOUT\"/>"+NEWLINE); + writer.write(" </root>"+NEWLINE); + + writer.write("</log4j:configuration>"+NEWLINE); + + writer.flush(); + writer.close(); + } + catch (IOException e) + { + fail("Unable to create temporary test log4j configuration"); + } + + return tmpFile; + } + + + + //******* Test Methods ******* // + + public void testSetRuntimeLoggerLevel() + { + LoggingManagementMBean lm = null; + try + { + lm = new LoggingManagementMBean(_testConfigFile.getAbsolutePath(), 0, _registry); + } + catch (JMException e) + { + fail("Could not create test LoggingManagementMBean"); + } + + //create a parent test logger, set its level explicitly + Logger log = Logger.getLogger(TEST_LOGGER); + log.setLevel(Level.toLevel("info")); + + //create child1 test logger, check its *effective* level is the same as the parent, "info" + Logger log1 = Logger.getLogger(TEST_LOGGER_CHILD1); + assertTrue("Test logger's level was not the expected value", + log1.getEffectiveLevel().toString().equalsIgnoreCase("info")); + + //now change its level to "warn" + assertTrue("Failed to set logger level", lm.setRuntimeLoggerLevel(TEST_LOGGER_CHILD1, "warn")); + + //check the change, see its actual level is "warn + assertTrue("Test logger's level was not the expected value", + log1.getLevel().toString().equalsIgnoreCase("warn")); + + //try an invalid level + assertFalse("Trying to set an invalid level succeded", lm.setRuntimeLoggerLevel(TEST_LOGGER_CHILD1, "made.up.level")); + } + + public void testSetRuntimeRootLoggerLevel() + { + LoggingManagementMBean lm = null; + try + { + lm = new LoggingManagementMBean(_testConfigFile.getAbsolutePath(), 0, _registry); + } + catch (JMException e) + { + fail("Could not create test LoggingManagementMBean"); + } + + Logger log = Logger.getRootLogger(); + + //get current root logger level + Level origLevel = log.getLevel(); + + //change level twice to ensure a new level is actually selected + + //set root loggers level to info + assertTrue("Failed to set root logger level", lm.setRuntimeRootLoggerLevel("debug")); + //check it is now actually info + Level currentLevel = log.getLevel(); + assertTrue("Logger level was not expected value", currentLevel.equals(Level.toLevel("debug"))); + + //try an invalid level + assertFalse("Trying to set an invalid level succeded", lm.setRuntimeRootLoggerLevel("made.up.level")); + + //set root loggers level to warn + assertTrue("Failed to set logger level", lm.setRuntimeRootLoggerLevel("info")); + //check it is now actually warn + currentLevel = log.getLevel(); + assertTrue("Logger level was not expected value", currentLevel.equals(Level.toLevel("info"))); + + //restore original level + log.setLevel(origLevel); + } + + public void testGetRuntimeRootLoggerLevel() + { + LoggingManagementMBean lm = null; + try + { + lm = new LoggingManagementMBean(_testConfigFile.getAbsolutePath(), 0, _registry); + } + catch (JMException e) + { + fail("Could not create test LoggingManagementMBean"); + } + + Logger log = Logger.getRootLogger(); + + //get current root logger level + Level origLevel = log.getLevel(); + + //change level twice to ensure a new level is actually selected + + //set root loggers level to debug + log.setLevel(Level.toLevel("debug")); + //check it is now actually debug + assertTrue("Logger level was not expected value", lm.getRuntimeRootLoggerLevel().equalsIgnoreCase("debug")); + + + //set root loggers level to warn + log.setLevel(Level.toLevel("info")); + //check it is now actually warn + assertTrue("Logger level was not expected value", lm.getRuntimeRootLoggerLevel().equalsIgnoreCase("info")); + + //restore original level + log.setLevel(origLevel); + } + + public void testViewEffectiveRuntimeLoggerLevels() + { + LoggingManagementMBean lm = null; + try + { + lm = new LoggingManagementMBean(_testConfigFile.getAbsolutePath(), 0, _registry); + } + catch (JMException e) + { + fail("Could not create test LoggingManagementMBean"); + } + + //(re)create a parent test logger, set its level explicitly + Logger log = Logger.getLogger(TEST_LOGGER); + log.setLevel(Level.toLevel("info")); + + //retrieve the current effective runtime logger level values + TabularDataSupport levels = (TabularDataSupport) lm.viewEffectiveRuntimeLoggerLevels(); + Collection<Object> records = levels.values(); + Map<String,String> list = new HashMap<String,String>(); + for (Object o : records) + { + CompositeData data = (CompositeData) o; + list.put(data.get(LOGGER_NAME).toString(), data.get(LOGGER_LEVEL).toString()); + } + + //check child2 does not exist already + assertFalse("Did not expect this logger to exist already", list.containsKey(TEST_LOGGER_CHILD2)); + + //create child2 test logger + Logger log2 = Logger.getLogger(TEST_LOGGER_CHILD2); + + //retrieve the current effective runtime logger level values + levels = (TabularDataSupport) lm.viewEffectiveRuntimeLoggerLevels(); + records = levels.values(); + list = new HashMap<String,String>(); + for (Object o : records) + { + CompositeData data = (CompositeData) o; + list.put(data.get(LOGGER_NAME).toString(), data.get(LOGGER_LEVEL).toString()); + } + + //verify the parent and child2 loggers are present in returned values + assertTrue(TEST_LOGGER + " logger was not in the returned list", list.containsKey(TEST_LOGGER)); + assertTrue(TEST_LOGGER_CHILD2 + " logger was not in the returned list", list.containsKey(TEST_LOGGER_CHILD2)); + + //check child2's effective level is the same as the parent, "info" + assertTrue("Test logger's level was not the expected value", + list.get(TEST_LOGGER_CHILD2).equalsIgnoreCase("info")); + + //now change its level explicitly to "warn" + log2.setLevel(Level.toLevel("warn")); + + //retrieve the current effective runtime logger level values + levels = (TabularDataSupport) lm.viewEffectiveRuntimeLoggerLevels(); + records = levels.values(); + list = new HashMap<String,String>(); + for (Object o : records) + { + CompositeData data = (CompositeData) o; + list.put(data.get(LOGGER_NAME).toString(), data.get(LOGGER_LEVEL).toString()); + } + + //check child2's effective level is now "warn" + assertTrue("Test logger's level was not the expected value", + list.get(TEST_LOGGER_CHILD2).equalsIgnoreCase("warn")); + } + + public void testViewAndSetConfigFileLoggerLevel() throws Exception + { + LoggingManagementMBean lm =null; + try + { + lm = new LoggingManagementMBean(_testConfigFile.getAbsolutePath(), 0, _registry); + } + catch (JMException e) + { + fail("Could not create test LoggingManagementMBean"); + } + + //retrieve the current values + TabularDataSupport levels = (TabularDataSupport) lm.viewConfigFileLoggerLevels(); + Collection<Object> records = levels.values(); + Map<String,String> list = new HashMap<String,String>(); + for (Object o : records) + { + CompositeData data = (CompositeData) o; + list.put(data.get(LOGGER_NAME).toString(), data.get(LOGGER_LEVEL).toString()); + } + + //check the 3 different types of logger definition are successfully retrieved before update + assertTrue("Wrong number of items in returned list", list.size() == 3); + assertTrue(TEST_CATEGORY_PRIORITY + " logger was not in the returned list", list.containsKey(TEST_CATEGORY_PRIORITY)); + assertTrue(TEST_CATEGORY_LEVEL + " logger was not in the returned list", list.containsKey(TEST_CATEGORY_LEVEL)); + assertTrue(TEST_LOGGER_LEVEL + " logger was not in the returned list", list.containsKey(TEST_LOGGER_LEVEL)); + + //check that their level is as expected + assertTrue(TEST_CATEGORY_PRIORITY + " logger's level was incorrect", list.get(TEST_CATEGORY_PRIORITY).equalsIgnoreCase("info")); + assertTrue(TEST_CATEGORY_LEVEL + " logger's level was incorrect", list.get(TEST_CATEGORY_LEVEL).equalsIgnoreCase("warn")); + assertTrue(TEST_LOGGER_LEVEL + " logger's level was incorrect", list.get(TEST_LOGGER_LEVEL).equalsIgnoreCase("error")); + + //increase their levels a notch to test the 3 different types of logger definition are successfully updated + //change the category+priority to warn + assertTrue("failed to set new level", lm.setConfigFileLoggerLevel(TEST_CATEGORY_PRIORITY, "warn")); + //change the category+level to error + assertTrue("failed to set new level", lm.setConfigFileLoggerLevel(TEST_CATEGORY_LEVEL, "error")); + //change the logger+level to trace + assertTrue("failed to set new level", lm.setConfigFileLoggerLevel(TEST_LOGGER_LEVEL, "trace")); + + //try an invalid level + assertFalse("Use of an invalid logger level was successfull", lm.setConfigFileLoggerLevel(TEST_LOGGER_LEVEL, "made.up.level")); + + //try an invalid logger name + assertFalse("Use of an invalid logger name was successfull", lm.setConfigFileLoggerLevel("made.up.logger.name", "info")); + + //retrieve the new values from the file and check them + levels = (TabularDataSupport) lm.viewConfigFileLoggerLevels(); + records = levels.values(); + list = new HashMap<String,String>(); + for (Object o : records) + { + CompositeData data = (CompositeData) o; + list.put(data.get(LOGGER_NAME).toString(), data.get(LOGGER_LEVEL).toString()); + } + + //check the 3 different types of logger definition are successfully retrieved after update + assertTrue("Wrong number of items in returned list", list.size() == 3); + assertTrue(TEST_CATEGORY_PRIORITY + " logger was not in the returned list", list.containsKey(TEST_CATEGORY_PRIORITY)); + assertTrue(TEST_CATEGORY_LEVEL + " logger was not in the returned list", list.containsKey(TEST_CATEGORY_LEVEL)); + assertTrue(TEST_LOGGER_LEVEL + " logger was not in the returned list", list.containsKey(TEST_LOGGER_LEVEL)); + + //check that their level is as expected after the changes + assertTrue(TEST_CATEGORY_PRIORITY + " logger's level was incorrect", list.get(TEST_CATEGORY_PRIORITY).equalsIgnoreCase("warn")); + assertTrue(TEST_CATEGORY_LEVEL + " logger's level was incorrect", list.get(TEST_CATEGORY_LEVEL).equalsIgnoreCase("error")); + assertTrue(TEST_LOGGER_LEVEL + " logger's level was incorrect", list.get(TEST_LOGGER_LEVEL).equalsIgnoreCase("trace")); + } + + public void testGetAndSetConfigFileRootLoggerLevel() throws Exception + { + LoggingManagementMBean lm =null; + try + { + lm = new LoggingManagementMBean(_testConfigFile.getAbsolutePath(), 0, _registry); + } + catch (JMException e) + { + fail("Could not create test LoggingManagementMBean"); + } + + //retrieve the current value + String level = lm.getConfigFileRootLoggerLevel(); + + //check the value was successfully retrieved before update + assertTrue("Retrieved RootLogger level was incorrect", level.equalsIgnoreCase("info")); + + //try an invalid level + assertFalse("Use of an invalid RootLogger level was successfull", lm.setConfigFileRootLoggerLevel("made.up.level")); + + //change the level to warn + assertTrue("Failed to set new RootLogger level", lm.setConfigFileRootLoggerLevel("warn")); + + //retrieve the current value + level = lm.getConfigFileRootLoggerLevel(); + + //check the value was successfully retrieved after update + assertTrue("Retrieved RootLogger level was incorrect", level.equalsIgnoreCase("warn")); + } + + public void testGetLog4jLogWatchInterval() + { + LoggingManagementMBean lm =null; + try + { + lm = new LoggingManagementMBean(_testConfigFile.getAbsolutePath(), 5000, _registry); + } + catch (JMException e) + { + fail("Could not create test LoggingManagementMBean"); + } + + assertTrue("Wrong value returned for logWatch period", lm.getLog4jLogWatchInterval() == 5000); + } + +} diff --git a/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/MBeanTestUtils.java b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/MBeanTestUtils.java new file mode 100644 index 0000000000..5f913e5f33 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/MBeanTestUtils.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.jmx.mbeans; + +import junit.framework.TestCase; + +import org.apache.commons.beanutils.PropertyUtils; +import org.apache.qpid.server.jmx.DefaultManagedObject; + +public class MBeanTestUtils +{ + + public static void assertMBeanAttribute(DefaultManagedObject managedObject, String jmxAttributeName, Object expectedValue) throws Exception + { + Object actualValue = PropertyUtils.getSimpleProperty(managedObject, jmxAttributeName); + TestCase.assertEquals("Attribute " + jmxAttributeName + " has unexpected value", expectedValue, actualValue); + } + + public static void setMBeanAttribute(DefaultManagedObject managedObject, String jmxAttributeName, Object newValue) throws Exception + { + PropertyUtils.setSimpleProperty(managedObject, jmxAttributeName, newValue); + } +} diff --git a/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java new file mode 100644 index 0000000000..2003c12735 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.jmx.mbeans; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.Matchers.isNull; +import static org.mockito.Matchers.argThat; + +import java.util.Arrays; +import java.util.Collections; + +import javax.management.ListenerNotFoundException; +import javax.management.Notification; +import javax.management.NotificationListener; +import javax.management.OperationsException; + +import org.apache.qpid.server.jmx.ManagedObjectRegistry; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.Statistics; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.queue.NotificationCheck; +import org.mockito.ArgumentMatcher; + +import junit.framework.TestCase; + +public class QueueMBeanTest extends TestCase +{ + private static final String QUEUE_NAME = "QUEUE_NAME"; + private static final String QUEUE_DESCRIPTION = "QUEUE_DESCRIPTION"; + private static final String QUEUE_TYPE = "QUEUE_TYPE"; + private static final String QUEUE_ALTERNATE_EXCHANGE = "QUEUE_ALTERNATE_EXCHANGE"; + + private Queue _mockQueue; + private Statistics _mockQueueStatistics; + private VirtualHostMBean _mockVirtualHostMBean; + private ManagedObjectRegistry _mockManagedObjectRegistry; + private QueueMBean _queueMBean; + + @Override + protected void setUp() throws Exception + { + _mockQueue = mock(Queue.class); + _mockQueueStatistics = mock(Statistics.class); + when(_mockQueue.getName()).thenReturn(QUEUE_NAME); + when(_mockQueue.getStatistics()).thenReturn(_mockQueueStatistics); + _mockVirtualHostMBean = mock(VirtualHostMBean.class); + + _mockManagedObjectRegistry = mock(ManagedObjectRegistry.class); + when(_mockVirtualHostMBean.getRegistry()).thenReturn(_mockManagedObjectRegistry); + + _queueMBean = new QueueMBean(_mockQueue, _mockVirtualHostMBean); + } + + public void testQueueName() + { + assertEquals(QUEUE_NAME, _queueMBean.getName()); + } + + /********** Statistics **********/ + + public void testGetMessageCount() throws Exception + { + assertStatistic("messageCount", 1000, Queue.QUEUE_DEPTH_MESSAGES); + } + + public void testGetReceivedMessageCount() throws Exception + { + assertStatistic("receivedMessageCount", 1000l, Queue.TOTAL_ENQUEUED_MESSAGES); + } + + public void testQueueDepth() throws Exception + { + assertStatistic("queueDepth", 4096l, Queue.QUEUE_DEPTH_BYTES); + } + + public void testActiveConsumerCount() throws Exception + { + assertStatistic("activeConsumerCount", 3, Queue.CONSUMER_COUNT_WITH_CREDIT); + } + + public void testConsumerCount() throws Exception + { + assertStatistic("consumerCount", 3, Queue.CONSUMER_COUNT); + } + + /********** Simple Attributes **********/ + + public void testGetQueueDescription() throws Exception + { + assertAttribute("description", QUEUE_DESCRIPTION, Queue.DESCRIPTION); + } + + public void testSetQueueDescription() throws Exception + { + testSetAttribute("description", Queue.DESCRIPTION, "descriptionold", "descriptionnew"); + } + + public void testQueueType() throws Exception + { + assertAttribute("queueType", QUEUE_TYPE, Queue.TYPE); + } + + public void testMaximumDeliveryCount() throws Exception + { + assertAttribute("maximumDeliveryCount", 5, Queue.MAXIMUM_DELIVERY_ATTEMPTS); + } + + public void testOwner() throws Exception + { + assertAttribute("owner", "testOwner", Queue.OWNER); + } + + public void testIsDurable() throws Exception + { + when(_mockQueue.isDurable()).thenReturn(true); + assertTrue(_queueMBean.isDurable()); + } + + public void testIsNotDurable() throws Exception + { + when(_mockQueue.isDurable()).thenReturn(false); + assertFalse(_queueMBean.isDurable()); + } + + public void testIsAutoDelete() throws Exception + { + when(_mockQueue.getLifetimePolicy()).thenReturn(LifetimePolicy.AUTO_DELETE); + assertTrue(_queueMBean.isAutoDelete()); + } + + public void testIsNotAutoDelete() throws Exception + { + when(_mockQueue.getLifetimePolicy()).thenReturn(LifetimePolicy.PERMANENT); + assertFalse(_queueMBean.isAutoDelete()); + } + + public void testGetMaximumMessageAge() throws Exception + { + assertAttribute("maximumMessageAge", 10000l, Queue.ALERT_THRESHOLD_MESSAGE_AGE); + } + + public void testSetMaximumMessageAge() throws Exception + { + testSetAttribute("maximumMessageAge", Queue.ALERT_THRESHOLD_MESSAGE_AGE, 1000l, 10000l); + } + + public void testGetMaximumMessageSize() throws Exception + { + assertAttribute("maximumMessageSize", 1024l, Queue.ALERT_THRESHOLD_MESSAGE_SIZE); + } + + public void testSetMaximumMessageSize() throws Exception + { + testSetAttribute("maximumMessageSize", Queue.ALERT_THRESHOLD_MESSAGE_SIZE, 1024l, 2048l); + } + + public void testGetMaximumMessageCount() throws Exception + { + assertAttribute("maximumMessageCount", 5000l, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES); + } + + public void testSetMaximumMessageCount() throws Exception + { + testSetAttribute("maximumMessageCount", Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 4000l, 5000l); + } + + public void testGetMaximumQueueDepth() throws Exception + { + assertAttribute("maximumQueueDepth", 1048576l, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES); + } + + public void testSetMaximumQueueDepth() throws Exception + { + testSetAttribute("maximumQueueDepth", Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,1048576l , 2097152l); + } + + public void testGetCapacity() throws Exception + { + assertAttribute("capacity", 1048576l, Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES); + } + + public void testSetCapacity() throws Exception + { + testSetAttribute("capacity", Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES,1048576l , 2097152l); + } + + public void testGetFlowResumeCapacity() throws Exception + { + assertAttribute("flowResumeCapacity", 1048576l, Queue.QUEUE_FLOW_RESUME_SIZE_BYTES); + } + + public void testSetFlowResumeCapacity() throws Exception + { + testSetAttribute("flowResumeCapacity", Queue.QUEUE_FLOW_RESUME_SIZE_BYTES,1048576l , 2097152l); + } + + public void testIsExclusive() throws Exception + { + assertAttribute("exclusive", Boolean.TRUE, Queue.EXCLUSIVE); + } + + public void testIsNotExclusive() throws Exception + { + assertAttribute("exclusive", Boolean.FALSE, Queue.EXCLUSIVE); + } + + public void testSetExclusive() throws Exception + { + testSetAttribute("exclusive", Queue.EXCLUSIVE, Boolean.FALSE , Boolean.TRUE); + } + + /********** Other attributes **********/ + + public void testGetAlternateExchange() + { + Exchange mockAlternateExchange = mock(Exchange.class); + when(mockAlternateExchange.getName()).thenReturn(QUEUE_ALTERNATE_EXCHANGE); + + when(_mockQueue.getAttribute(Queue.ALTERNATE_EXCHANGE)).thenReturn(mockAlternateExchange); + + assertEquals(QUEUE_ALTERNATE_EXCHANGE, _queueMBean.getAlternateExchange()); + } + + public void testGetAlternateExchangeWhenQueueHasNone() + { + when(_mockQueue.getAttribute(Queue.ALTERNATE_EXCHANGE)).thenReturn(null); + + assertNull(_queueMBean.getAlternateExchange()); + } + + public void testSetAlternateExchange() throws Exception + { + Exchange mockExchange1 = mock(Exchange.class); + when(mockExchange1.getName()).thenReturn("exchange1"); + + Exchange mockExchange2 = mock(Exchange.class); + when(mockExchange2.getName()).thenReturn("exchange2"); + + Exchange mockExchange3 = mock(Exchange.class); + when(mockExchange3.getName()).thenReturn("exchange3"); + + VirtualHost mockVirtualHost = mock(VirtualHost.class); + when(mockVirtualHost.getExchanges()).thenReturn(Arrays.asList(new Exchange[] {mockExchange1, mockExchange2, mockExchange3})); + when(_mockQueue.getParent(VirtualHost.class)).thenReturn(mockVirtualHost); + + _queueMBean.setAlternateExchange("exchange2"); + verify(_mockQueue).setAttribute(Queue.ALTERNATE_EXCHANGE, null, mockExchange2); + } + + public void testSetAlternateExchangeWithUnknownExchangeName() throws Exception + { + Exchange mockExchange = mock(Exchange.class); + when(mockExchange.getName()).thenReturn("exchange1"); + + VirtualHost mockVirtualHost = mock(VirtualHost.class); + when(mockVirtualHost.getExchanges()).thenReturn(Collections.singletonList(mockExchange)); + when(_mockQueue.getParent(VirtualHost.class)).thenReturn(mockVirtualHost); + + try + { + _queueMBean.setAlternateExchange("notknown"); + fail("Exception not thrown"); + } + catch(OperationsException oe) + { + // PASS + } + } + + public void testRemoveAlternateExchange() throws Exception + { + _queueMBean.setAlternateExchange(""); + verify(_mockQueue).setAttribute(Queue.ALTERNATE_EXCHANGE, null, null); + } + + /********** Operations **********/ + + /********** Notifications **********/ + + public void testNotificationListenerCalled() throws Exception + { + NotificationListener listener = mock(NotificationListener.class); + _queueMBean.addNotificationListener(listener, null, null); + + NotificationCheck notification = mock(NotificationCheck.class); + String notificationMsg = "Test notification message"; + + _queueMBean.notifyClients(notification, _mockQueue, notificationMsg); + verify(listener).handleNotification(isNotificationWithMessage(notificationMsg), + isNull()); + } + + public void testAddRemoveNotificationListener() throws Exception + { + NotificationListener listener1 = mock(NotificationListener.class); + _queueMBean.addNotificationListener(listener1, null, null); + _queueMBean.removeNotificationListener(listener1); + } + + public void testRemoveUnknownNotificationListener() throws Exception + { + NotificationListener listener1 = mock(NotificationListener.class); + try + { + _queueMBean.removeNotificationListener(listener1); + fail("Exception not thrown"); + } + catch (ListenerNotFoundException e) + { + // PASS + } + } + + private Notification isNotificationWithMessage(final String expectedMessage) + { + return argThat( new ArgumentMatcher<Notification>() + { + @Override + public boolean matches(Object argument) + { + Notification actual = (Notification) argument; + return actual.getMessage().endsWith(expectedMessage); + } + }); + } + + private void assertStatistic(String jmxAttributeName, Object expectedValue, String underlyingAttributeName) throws Exception + { + when(_mockQueueStatistics.getStatistic(underlyingAttributeName)).thenReturn(expectedValue); + MBeanTestUtils.assertMBeanAttribute(_queueMBean, jmxAttributeName, expectedValue); + } + + private void assertAttribute(String jmxAttributeName, Object expectedValue, String underlyingAttributeName) throws Exception + { + when(_mockQueue.getAttribute(underlyingAttributeName)).thenReturn(expectedValue); + MBeanTestUtils.assertMBeanAttribute(_queueMBean, jmxAttributeName, expectedValue); + } + + private void testSetAttribute(String jmxAttributeName, String underlyingAttributeName, Object originalAttributeValue, Object newAttributeValue) throws Exception + { + when(_mockQueue.getAttribute(underlyingAttributeName)).thenReturn(originalAttributeValue); + + MBeanTestUtils.setMBeanAttribute(_queueMBean, jmxAttributeName, newAttributeValue); + + verify(_mockQueue).setAttribute(underlyingAttributeName, originalAttributeValue, newAttributeValue); + } + +} diff --git a/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/UserManagementMBeanTest.java b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/UserManagementMBeanTest.java new file mode 100644 index 0000000000..8ca6c521eb --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/UserManagementMBeanTest.java @@ -0,0 +1,157 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.jmx.mbeans; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; +import javax.security.auth.login.AccountNotFoundException; + +import junit.framework.TestCase; + +import org.apache.qpid.management.common.mbeans.UserManagement; +import org.apache.qpid.server.jmx.ManagedObjectRegistry; +import org.apache.qpid.server.model.PasswordCredentialManagingAuthenticationProvider; + +public class UserManagementMBeanTest extends TestCase +{ + private UserManagementMBean _userManagement; + private ManagedObjectRegistry _mockRegistry; + private PasswordCredentialManagingAuthenticationProvider _mockProvider; + + private static final String TEST_USERNAME = "testuser"; + private static final String TEST_PASSWORD = "password"; + + @Override + public void setUp() throws Exception + { + super.setUp(); + + _mockProvider = mock(PasswordCredentialManagingAuthenticationProvider.class); + _mockRegistry = mock(ManagedObjectRegistry.class); + _userManagement = new UserManagementMBean(_mockProvider, _mockRegistry); + } + + public void testMBeanRegistersItself() throws Exception + { + UserManagementMBean userManagementMBean = new UserManagementMBean(_mockProvider, _mockRegistry); + verify(_mockRegistry).registerObject(userManagementMBean); + } + + public void testDeleteUser() throws Exception + { + boolean deleteSuccess = _userManagement.deleteUser(TEST_USERNAME); + assertTrue("Expected successful delete", deleteSuccess); + + verify(_mockProvider).deleteUser(TEST_USERNAME); + } + + public void testDeleteUserWhereUserDoesNotExist() throws Exception + { + doThrow(AccountNotFoundException.class).when(_mockProvider).deleteUser(TEST_USERNAME); + + boolean deleteSuccess = _userManagement.deleteUser(TEST_USERNAME); + assertFalse("Expected unsuccessful delete", deleteSuccess); + } + + public void testCreateUser() throws Exception + { + when(_mockProvider.createUser(TEST_USERNAME, TEST_PASSWORD, null)).thenReturn(true); + + boolean createSuccess = _userManagement.createUser(TEST_USERNAME, TEST_PASSWORD); + assertTrue(createSuccess); + } + + public void testCreateUserWhereUserAlreadyExists() + { + when(_mockProvider.createUser(TEST_USERNAME, TEST_PASSWORD, null)).thenReturn(false); + + boolean createSuccess = _userManagement.createUser(TEST_USERNAME, TEST_PASSWORD); + assertFalse(createSuccess); + } + + public void testSetPassword() throws Exception + { + boolean setPasswordSuccess = _userManagement.setPassword(TEST_USERNAME, TEST_PASSWORD); + assertTrue(setPasswordSuccess); + + assertTrue("Set password should return true to flag successful change", setPasswordSuccess); + + verify(_mockProvider).setPassword(TEST_USERNAME, TEST_PASSWORD); + } + + public void testSetPasswordWhereUserDoesNotExist() throws Exception + { + doThrow(AccountNotFoundException.class).when(_mockProvider).setPassword(TEST_USERNAME, TEST_PASSWORD); + + boolean setPasswordSuccess = _userManagement.setPassword(TEST_USERNAME, TEST_PASSWORD); + + assertFalse("Set password should return false to flag unsuccessful change", setPasswordSuccess); + } + + public void testReload() throws Exception + { + boolean reloadSuccess = _userManagement.reloadData(); + + assertTrue("Reload should return true to flag succesful update", reloadSuccess); + + verify(_mockProvider).reload(); + } + + public void testReloadFails() throws Exception + { + doThrow(IOException.class).when(_mockProvider).reload(); + + boolean reloadSuccess = _userManagement.reloadData(); + + assertFalse("Expected reload to fail", reloadSuccess); + } + + public void testViewUsers() throws Exception + { + Map<String,String> args = Collections.emptyMap(); + when(_mockProvider.getUsers()).thenReturn(Collections.singletonMap(TEST_USERNAME, args)); + + TabularData userList = _userManagement.viewUsers(); + + assertNotNull(userList); + assertEquals("Unexpected number of users in user list", 1, userList.size()); + assertTrue(userList.containsKey(new Object[]{TEST_USERNAME})); + + // Check the deprecated read, write and admin items continue to exist but return false. + CompositeData userRec = userList.get(new Object[]{TEST_USERNAME}); + assertTrue(userRec.containsKey(UserManagement.RIGHTS_READ_ONLY)); + assertEquals(false, userRec.get(UserManagement.RIGHTS_READ_ONLY)); + assertEquals(false, userRec.get(UserManagement.RIGHTS_READ_WRITE)); + assertTrue(userRec.containsKey(UserManagement.RIGHTS_READ_WRITE)); + assertTrue(userRec.containsKey(UserManagement.RIGHTS_ADMIN)); + assertEquals(false, userRec.get(UserManagement.RIGHTS_ADMIN)); + } +} diff --git a/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java new file mode 100644 index 0000000000..93a80665a9 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.jmx.mbeans; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.Map; + +import javax.management.OperationsException; + +import junit.framework.TestCase; + +import org.apache.qpid.server.jmx.ManagedObjectRegistry; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.mockito.verification.VerificationMode; + +public class VirtualHostManagerMBeanTest extends TestCase +{ + private static final String TEST_QUEUE_NAME = "QUEUE_NAME"; + private static final String TEST_EXCHANGE_NAME = "EXCHANGE_NAME"; + private static final String TEST_OWNER = "OWNER"; + private static final String TEST_DESCRIPTION = "DESCRIPTION"; + private static final String TEST_EXCHANGE_TYPE = "EXCHANGE_TYPE"; + + private static final Map<String, Object> EMPTY_ARGUMENT_MAP = Collections.emptyMap(); + + private VirtualHost _mockVirtualHost; + private ManagedObjectRegistry _mockManagedObjectRegistry; + private VirtualHostManagerMBean _virtualHostManagerMBean; + + @Override + protected void setUp() throws Exception + { + _mockVirtualHost = mock(VirtualHost.class); + when(_mockVirtualHost.getExchangeTypes()).thenReturn(Collections.singletonList(TEST_EXCHANGE_TYPE)); + + _mockManagedObjectRegistry = mock(ManagedObjectRegistry.class); + + _virtualHostManagerMBean = new VirtualHostManagerMBean(new VirtualHostMBean(_mockVirtualHost, _mockManagedObjectRegistry)); + } + + public void testCreateQueueWithNoOwner() throws Exception + { + _virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, null, true); + + verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, EMPTY_ARGUMENT_MAP); + } + + /** + * Some users have been abusing the owner parameter as a description. Decision has been taken to map this parameter + * through to the description field (if the description field is passed, the owner is discarded). + */ + public void testCreateQueueWithOwnerMappedThroughToDescription() throws Exception + { + _virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, TEST_OWNER, true); + + Map<String, Object> expectedArguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_OWNER); + verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, expectedArguments); + } + + public void testCreateQueueWithOwnerAndDescriptionDiscardsOwner() throws Exception + { + Map<String, Object> arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION); + _virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, TEST_OWNER, true, arguments); + + Map<String, Object> expectedArguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION); + verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, expectedArguments); + } + + public void testDeleteQueue() throws Exception + { + Queue mockQueue = mock(Queue.class); + when(mockQueue.getName()).thenReturn("queue1"); + when(_mockVirtualHost.getQueues()).thenReturn(Collections.singletonList(mockQueue)); + + _virtualHostManagerMBean.deleteQueue("queue1"); + verify(mockQueue).delete(); + } + + public void testDeleteQueueWhenQueueDoesNotExist() throws Exception + { + Queue mockQueue = mock(Queue.class); + when(mockQueue.getName()).thenReturn("queue1"); + when(_mockVirtualHost.getQueues()).thenReturn(Collections.singletonList(mockQueue)); + + try + { + _virtualHostManagerMBean.deleteQueue("unknownqueue"); + fail("Exception not thrown"); + } + catch(OperationsException oe) + { + // PASS + assertEquals("No such queue \"unknownqueue\"", oe.getMessage()); + } + verify(mockQueue, never()).delete(); + } + + public void testCreateNewDurableExchange() throws Exception + { + _virtualHostManagerMBean.createNewExchange(TEST_EXCHANGE_NAME, TEST_EXCHANGE_TYPE, true); + verify(_mockVirtualHost).createExchange(TEST_EXCHANGE_NAME, State.ACTIVE, true, LifetimePolicy.PERMANENT, 0, TEST_EXCHANGE_TYPE, EMPTY_ARGUMENT_MAP); + } + + public void testCreateNewExchangeWithUnknownExchangeType() throws Exception + { + String exchangeType = "notknown"; + try + { + _virtualHostManagerMBean.createNewExchange(TEST_EXCHANGE_NAME, exchangeType, true); + fail("Exception not thrown"); + } + catch (OperationsException oe) + { + // PASS + } + verify(_mockVirtualHost, never()).createExchange(TEST_EXCHANGE_NAME, State.ACTIVE, true, LifetimePolicy.PERMANENT, 0, exchangeType, EMPTY_ARGUMENT_MAP); + } + + public void testUnregisterExchange() throws Exception + { + Exchange mockExchange = mock(Exchange.class); + when(mockExchange.getName()).thenReturn("exchange1"); + when(_mockVirtualHost.getExchanges()).thenReturn(Collections.singletonList(mockExchange)); + + _virtualHostManagerMBean.unregisterExchange("exchange1"); + verify(mockExchange).delete(); + } + + public void testUnregisterExchangeWhenExchangeDoesNotExist() throws Exception + { + Exchange mockExchange = mock(Exchange.class); + when(mockExchange.getName()).thenReturn("exchange1"); + when(_mockVirtualHost.getExchanges()).thenReturn(Collections.singletonList(mockExchange)); + + try + { + _virtualHostManagerMBean.unregisterExchange("unknownexchange"); + fail("Exception not thrown"); + } + catch(OperationsException oe) + { + // PASS + assertEquals("No such exchange \"unknownexchange\"", oe.getMessage()); + } + + verify(mockExchange, never()).delete(); + } +} diff --git a/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/BrokerManagementTest.java b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/BrokerManagementTest.java new file mode 100644 index 0000000000..2c341b7f2e --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/BrokerManagementTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.systest.management.jmx; + +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedExchange; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.management.MBeanException; +import javax.management.ObjectName; + +/** + * Tests the JMX API for the Managed Broker. + * + */ +public class BrokerManagementTest extends QpidBrokerTestCase +{ + private static final String VIRTUAL_HOST = "test"; + + /** + * JMX helper. + */ + private JMXTestUtils _jmxUtils; + private ManagedBroker _managedBroker; + + public void setUp() throws Exception + { + _jmxUtils = new JMXTestUtils(this); + _jmxUtils.setUp(); + super.setUp(); + _jmxUtils.open(); + _managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); + } + + public void tearDown() throws Exception + { + if (_jmxUtils != null) + { + _jmxUtils.close(); + } + super.tearDown(); + } + + /** + * Tests queue creation/deletion also verifying the automatic binding to the default exchange. + */ + public void testCreateQueueAndDeletion() throws Exception + { + final String queueName = getTestQueueName(); + final ManagedExchange defaultExchange = _jmxUtils.getManagedExchange(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString()); + + // Check that bind does not exist before queue creation + assertFalse("Binding to " + queueName + " should not exist in default exchange before queue creation", + defaultExchange.bindings().containsKey(new String[] {queueName})); + + _managedBroker.createNewQueue(queueName, "testowner", true); + + // Ensure the queue exists + assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName(VIRTUAL_HOST, queueName)); + assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName)); + + // Now verify that the default exchange has been bound. + assertTrue("Binding to " + queueName + " should exist in default exchange after queue creation", + defaultExchange.bindings().containsKey(new String[] {queueName})); + + // Now delete the queue + _managedBroker.deleteQueue(queueName); + + // Finally ensure that the binding has been removed. + assertFalse("Binding to " + queueName + " should not exist in default exchange after queue deletion", + defaultExchange.bindings().containsKey(new String[] {queueName})); + } + + /** + * Tests exchange creation/deletion via JMX API. + */ + public void testCreateExchangeAndUnregister() throws Exception + { + String exchangeName = getTestName(); + _managedBroker.createNewExchange(exchangeName, "topic", true); + + ManagedExchange exchange = _jmxUtils.getManagedExchange(exchangeName); + assertNotNull("Exchange should exist", exchange); + + _managedBroker.unregisterExchange(exchangeName); + } + + /** + * Tests that it is disallowed to unregister the default exchange. + */ + public void testUnregisterOfDefaultExchangeDisallowed() throws Exception + { + String defaultExchangeName = ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(); + + try + { + _managedBroker.unregisterExchange(defaultExchangeName); + fail("Exception not thrown"); + } + catch (MBeanException mbe) + { + // PASS + assertEquals("Error in unregistering exchange " + defaultExchangeName, mbe.getMessage()); + assertTrue(mbe.getCause().getMessage().contains("Cannot unregister the default exchange")); + } + final ManagedExchange defaultExchange = _jmxUtils.getManagedExchange(defaultExchangeName); + assertNotNull("Exchange should exist", defaultExchange); + } + +} diff --git a/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java new file mode 100644 index 0000000000..28d7bf4aed --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.systest.management.jmx; + +import java.io.IOException; +import java.util.Date; +import java.util.Iterator; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.management.JMException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.TabularData; + +import org.apache.commons.lang.StringUtils; +import org.apache.qpid.common.QpidProperties; +import org.apache.qpid.management.common.mbeans.ManagedConnection; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class ConnectionManagementTest extends QpidBrokerTestCase +{ + private static final String VIRTUAL_HOST_NAME = "test"; + + private JMXTestUtils _jmxUtils; + private Connection _connection; + + public void setUp() throws Exception + { + _jmxUtils = new JMXTestUtils(this); + _jmxUtils.setUp(); // modifies broker config therefore must be done before super.setUp() + super.setUp(); + _jmxUtils.open(); + } + + public void tearDown() throws Exception + { + try + { + if (_jmxUtils != null) + { + _jmxUtils.close(); + } + } + finally + { + super.tearDown(); + } + } + + public void testNumberOfManagedConnectionsMatchesNumberOfClientConnections() throws Exception + { + assertEquals("Expected no managed connections", 0, getManagedConnections().size()); + + _connection = getConnection(); + assertEquals("Expected one managed connection", 1, getManagedConnections().size()); + + _connection.close(); + assertEquals("Expected no managed connections after client connection closed", 0, getManagedConnections().size()); + } + + public void testGetAttributes() throws Exception + { + _connection = getConnection(); + final ManagedConnection mBean = getConnectionMBean(); + + checkAuthorisedId(mBean); + checkClientVersion(mBean); + checkClientId(mBean); + } + + public void testNonTransactedSession() throws Exception + { + _connection = getConnection(); + + boolean transactional = false; + boolean flowBlocked = false; + + _connection.createSession(transactional, Session.AUTO_ACKNOWLEDGE); + + final ManagedConnection mBean = getConnectionMBean(); + final CompositeDataSupport row = getTheOneChannelRow(mBean); + assertChannelRowData(row, 0, transactional, flowBlocked); + } + + public void testTransactedSessionWithUnackMessages() throws Exception + { + _connection = getConnection(); + _connection.start(); + + boolean transactional = true; + int numberOfMessages = 2; + final Session session = _connection.createSession(transactional, Session.SESSION_TRANSACTED); + final Destination destination = session.createQueue(getTestQueueName()); + final MessageConsumer consumer = session.createConsumer(destination); + + sendMessage(session, destination, numberOfMessages); + receiveMessagesWithoutCommit(consumer, numberOfMessages); + + final ManagedConnection mBean = getConnectionMBean(); + final CompositeDataSupport row = getTheOneChannelRow(mBean); + boolean flowBlocked = false; + assertChannelRowData(row, numberOfMessages, transactional, flowBlocked); + + // check that commit advances the lastIoTime + final Date initialLastIOTime = mBean.getLastIoTime(); + session.commit(); + assertTrue("commit should have caused last IO time to advance", mBean.getLastIoTime().after(initialLastIOTime)); + + // check that channels() now returns one session with no unacknowledged messages + final CompositeDataSupport rowAfterCommit = getTheOneChannelRow(mBean); + final Number unackCountAfterCommit = (Number) rowAfterCommit.get(ManagedConnection.UNACKED_COUNT); + assertEquals("Unexpected number of unacknowledged messages", 0, unackCountAfterCommit); + } + + + public void testProducerFlowBlocked() throws Exception + { + _connection = getConnection(); + _connection.start(); + + String queueName = getTestQueueName(); + Session session = _connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(queueName); + createQueueOnBroker(session, queue); + + ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + managedQueue.setFlowResumeCapacity(DEFAULT_MESSAGE_SIZE * 2l); + managedQueue.setCapacity(DEFAULT_MESSAGE_SIZE * 3l); + + final ManagedConnection managedConnection = getConnectionMBean(); + + // Check that producer flow is not block before test + final CompositeDataSupport rowBeforeSend = getTheOneChannelRow(managedConnection); + assertFlowBlocked(rowBeforeSend, false); + + + // Check that producer flow does not become block too soon + sendMessage(session, queue, 3); + final CompositeDataSupport rowBeforeFull = getTheOneChannelRow(managedConnection); + assertFlowBlocked(rowBeforeFull, false); + + // Fourth message will over-fill the queue (but as we are not sending more messages, client thread wont't block) + sendMessage(session, queue, 1); + final CompositeDataSupport rowAfterFull = getTheOneChannelRow(managedConnection); + assertFlowBlocked(rowAfterFull, true); + + // Consume two to bring the queue down to the resume capacity + MessageConsumer consumer = session.createConsumer(queue); + assertNotNull("Could not receive first message", consumer.receive(1000)); + assertNotNull("Could not receive second message", consumer.receive(1000)); + session.commit(); + + // Check that producer flow is no longer blocked + final CompositeDataSupport rowAfterReceive = getTheOneChannelRow(managedConnection); + assertFlowBlocked(rowAfterReceive, false); + } + + private void createQueueOnBroker(Session session, Destination destination) throws JMSException + { + session.createConsumer(destination).close(); // Create a consumer only to cause queue creation + } + + private void assertChannelRowData(final CompositeData row, int unacknowledgedMessages, boolean isTransactional, boolean flowBlocked) + { + assertNotNull(row); + assertEquals("Unexpected transactional flag", isTransactional, row.get(ManagedConnection.TRANSACTIONAL)); + assertEquals("Unexpected unacknowledged message count", unacknowledgedMessages, row.get(ManagedConnection.UNACKED_COUNT)); + assertEquals("Unexpected flow blocked", flowBlocked, row.get(ManagedConnection.FLOW_BLOCKED)); + } + + private void assertFlowBlocked(final CompositeData row, boolean flowBlocked) + { + assertNotNull(row); + assertEquals("Unexpected flow blocked", flowBlocked, row.get(ManagedConnection.FLOW_BLOCKED)); + } + + private void checkAuthorisedId(ManagedConnection mBean) throws Exception + { + assertEquals("Unexpected authorized id", GUEST_USERNAME, mBean.getAuthorizedId()); + } + + private void checkClientVersion(ManagedConnection mBean) throws Exception + { + String expectedVersion = QpidProperties.getReleaseVersion(); + assertTrue(StringUtils.isNotBlank(expectedVersion)); + + assertEquals("Unexpected version", expectedVersion, mBean.getVersion()); + } + + private void checkClientId(ManagedConnection mBean) throws Exception + { + String expectedClientId = _connection.getClientID(); + assertTrue(StringUtils.isNotBlank(expectedClientId)); + + assertEquals("Unexpected ClientId", expectedClientId, mBean.getClientId()); + } + + private ManagedConnection getConnectionMBean() + { + List<ManagedConnection> connections = getManagedConnections(); + assertNotNull("Connection MBean is not found", connections); + assertEquals("Unexpected number of connection mbeans", 1, connections.size()); + final ManagedConnection mBean = connections.get(0); + assertNotNull("Connection MBean is null", mBean); + return mBean; + } + + private List<ManagedConnection> getManagedConnections() + { + return _jmxUtils.getManagedConnections(VIRTUAL_HOST_NAME); + } + + private CompositeDataSupport getTheOneChannelRow(final ManagedConnection mBean) throws Exception + { + TabularData channelsData = getChannelsDataWithRetry(mBean); + + assertEquals("Unexpected number of rows in channel table", 1, channelsData.size()); + + @SuppressWarnings("unchecked") + final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) channelsData.values().iterator(); + final CompositeDataSupport row = rowItr.next(); + return row; + } + + private void receiveMessagesWithoutCommit(final MessageConsumer consumer, int numberOfMessages) throws Exception + { + for (int i = 0; i < numberOfMessages; i++) + { + final Message m = consumer.receive(1000l); + assertNotNull("Message " + i + " is not received", m); + } + } + + private TabularData getChannelsDataWithRetry(final ManagedConnection mBean) + throws IOException, JMException + { + TabularData channelsData = mBean.channels(); + int retries = 0; + while(channelsData.size() == 0 && retries < 5) + { + sleep(); + channelsData = mBean.channels(); + retries++; + } + return channelsData; + } + + private void sleep() + { + try + { + Thread.sleep(50); + } + catch (InterruptedException ie) + { + Thread.currentThread().interrupt(); + } + }} diff --git a/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/ManagementActorLoggingTest.java b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/ManagementActorLoggingTest.java new file mode 100644 index 0000000000..47b38381c5 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/ManagementActorLoggingTest.java @@ -0,0 +1,480 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest.management.jmx; + +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedConnection; +import org.apache.qpid.management.common.mbeans.ManagedExchange; +import org.apache.qpid.server.logging.AbstractTestLogging; +import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject; +import org.apache.qpid.test.utils.JMXTestUtils; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.management.JMException; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Test class to test if any change in the broker JMX code is affesting the management console + * There are some hardcoding of management feature names and parameter names to create a customized + * look in the console. + */ +public class ManagementActorLoggingTest extends AbstractTestLogging +{ + private JMXTestUtils _jmxUtils; + private boolean _closed = false; + + @Override + public void setUp() throws Exception + { + _jmxUtils = new JMXTestUtils(this); + _jmxUtils.setUp(); + super.setUp(); + _jmxUtils.open(); + } + + @Override + public void tearDown() throws Exception + { + if(!_closed) + { + _jmxUtils.close(); + } + super.tearDown(); + } + + /** + * Description: + * When a connected client has its connection closed via the Management Console this will be logged as a CON-1002 message. + * Input: + * + * 1. Running Broker + * 2. Connected Client + * 3. Connection is closed via Management Console + * Output: + * + * <date> CON-1002 : Close + * + * Validation Steps: + * 4. The CON ID is correct + * 5. This must be the last CON message for the Connection + * 6. It must be preceded by a CON-1001 for this Connection + * + * @throws Exception - {@see ManagedConnection.closeConnection and #getConnection} + * @throws java.io.IOException - if there is a problem reseting the log monitor + */ + public void testConnectionCloseViaManagement() throws IOException, Exception + { + //Create a connection to the broker + Connection connection = getConnection(); + + // Monitor the connection for an exception being thrown + // this should be a DisconnectionException but it is not this tests + // job to valiate that. Only use the exception as a synchronisation + // to check the log file for the Close message + final CountDownLatch exceptionReceived = new CountDownLatch(1); + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + //Failover being attempted. + exceptionReceived.countDown(); + } + }); + + //Remove the connection close from any 0-10 connections + _monitor.markDiscardPoint(); + + // Get a managedConnection + ManagedConnection mangedConnection = _jmxUtils.getManagedObject(ManagedConnection.class, "org.apache.qpid:type=VirtualHost.Connection,*"); + + //Close the connection + mangedConnection.closeConnection(); + + //Wait for the connection to close + assertTrue("Timed out waiting for conneciton to report close", + exceptionReceived.await(2, TimeUnit.SECONDS)); + + //Validate results + List<String> results = waitAndFindMatches("CON-1002"); + + assertEquals("Unexpected Connection Close count", 1, results.size()); + } + + /** + * Description: + * Exchange creation is possible from the Management Console. + * When an exchanged is created in this way then a EXH-1001 create message + * is expected to be logged. + * Input: + * + * 1. Running broker + * 2. Connected Management Console + * 3. Exchange Created via Management Console + * Output: + * + * EXH-1001 : Create : [Durable] Type:<value> Name:<value> + * + * Validation Steps: + * 4. The EXH ID is correct + * 5. The correct tags are present in the message based on the create options + * + * @throws java.io.IOException - if there is a problem reseting the log monitor + * @throws javax.management.JMException - {@see #createQueue and ManagedExchange.deleteQueue} + */ + public void testCreateExchangeDirectTransientViaManagementConsole() throws IOException, JMException + { + _monitor.markDiscardPoint(); + + _jmxUtils.createExchange("test", getName(), "direct", false); + + // Validate + + //1 - ID is correct + List<String> results = waitAndFindMatches("EXH-1001"); + + assertEquals("More than one exchange creation found", 1, results.size()); + + String log = getLogMessage(results, 0); + + // Validate correct exchange name + assertTrue("Incorrect exchange name created:" + log, log.endsWith(getName())); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + } + + public void testCreateExchangeTopicTransientViaManagementConsole() throws IOException, JMException + { + //Remove any previous exchange declares + _monitor.markDiscardPoint(); + + _jmxUtils.createExchange("test", getName(), "topic", false); + + // Validate + + //1 - ID is correct + List<String> results = waitAndFindMatches("EXH-1001"); + + assertEquals("More than one exchange creation found", 1, results.size()); + + String log = getLogMessage(results, 0); + + // Validate correct exchange name + assertTrue("Incorrect exchange name created:" + log, log.endsWith(getName())); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + + } + + public void testCreateExchangeFanoutTransientViaManagementConsole() throws IOException, JMException + { + //Remove any previous exchange declares + _monitor.markDiscardPoint(); + + _jmxUtils.createExchange("test", getName(), "fanout", false); + + // Validate + + //1 - ID is correct + List<String> results = waitAndFindMatches("EXH-1001"); + + assertEquals("More than one exchange creation found", 1, results.size()); + + String log = getLogMessage(results, 0); + + // Validate correct exchange name + assertTrue("Incorrect exchange name created:" + log, log.endsWith(getName())); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + + } + + public void testCreateExchangeHeadersTransientViaManagementConsole() throws IOException, JMException + { + //Remove any previous exchange declares + _monitor.markDiscardPoint(); + + _jmxUtils.createExchange("test", getName(), "headers", false); + + // Validate + + //1 - ID is correct + List<String> results = waitAndFindMatches("EXH-1001"); + + assertEquals("More than one exchange creation found", 1, results.size()); + + String log = getLogMessage(results, 0); + + // Validate correct exchange name + assertTrue("Incorrect exchange name created:" + log, log.endsWith(getName())); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + + } + + /** + * Description: + * Queue creation is possible from the Management Console. When a queue is created in this way then a QUE-1001 create message is expected to be logged. + * Input: + * + * 1. Running broker + * 2. Connected Management Console + * 3. Queue Created via Management Console + * Output: + * + * <date> QUE-1001 : Create : Transient Owner:<name> + * + * Validation Steps: + * 4. The QUE ID is correct + * 5. The correct tags are present in the message based on the create options + * + * @throws java.io.IOException - if there is a problem reseting the log monitor + * @throws javax.management.JMException - {@see #createQueue and ManagedExchange.deleteQueue} + */ + public void testCreateQueueTransientViaManagementConsole() throws IOException, JMException + { + //Remove any previous queue declares + _monitor.markDiscardPoint(); + + _jmxUtils.createQueue("test", getName(), null, false); + + // Validate + + List<String> results = waitAndFindMatches("QUE-1001"); + + assertEquals("More than one queue creation found", 1, results.size()); + + String log = getLogMessage(results, 0); + + // Validate correct queue name + String subject = fromSubject(log); + assertEquals("Incorrect queue name created", getName(), AbstractTestLogSubject.getSlice("qu", subject)); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + } + + /** + * Description: + * The ManagementConsole can be used to delete a queue. When this is done a QUE-1002 Deleted message must be logged. + * Input: + * + * 1. Running Broker + * 2. Queue created on the broker with no subscribers + * 3. Management Console connected + * 4. Queue is deleted via Management Console + * Output: + * + * <date> QUE-1002 : Deleted + * + * Validation Steps: + * 5. The QUE ID is correct + * + * @throws java.io.IOException - if there is a problem reseting the log monitor + * @throws javax.management.JMException - {@see #createQueue and ManagedExchange.deleteQueue} + */ + public void testQueueDeleteViaManagementConsole() throws IOException, JMException + { + //Remove any previous queue declares + _monitor.markDiscardPoint(); + + _jmxUtils.createQueue("test", getName(), null, false); + + ManagedBroker managedBroker = _jmxUtils.getManagedBroker("test"); + + managedBroker.deleteQueue(getName()); + + List<String> results = waitAndFindMatches("QUE-1002"); + + assertEquals("More than one queue deletion found", 1, results.size()); + + String log = getLog(results.get(0)); + + // Validate correct binding + String subject = fromSubject(log); + assertEquals("Incorrect queue named in delete", getName(), AbstractTestLogSubject.getSlice("qu", subject)); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + + } + + /** + * Description: + * The binding of a Queue and an Exchange is done via a Binding. When this Binding is created via the Management Console a BND-1001 Create message will be logged. + * Input: + * + * 1. Running Broker + * 2. Connected Management Console + * 3. Use Management Console to perform binding + * Output: + * + * <date> BND-1001 : Create + * + * Validation Steps: + * 4. The BND ID is correct + * 5. This will be the first message for the given binding + * + * @throws java.io.IOException - if there is a problem reseting the log monitor + * @throws javax.management.JMException - {@see #createQueue and ManagedExchange.createNewBinding} + */ + public void testBindingCreateOnDirectViaManagementConsole() throws IOException, JMException + { + //Remove any previous queue declares + _monitor.markDiscardPoint(); + + _jmxUtils.createQueue("test", getName(), null, false); + + ManagedExchange managedExchange = _jmxUtils.getManagedExchange("amq.direct"); + + managedExchange.createNewBinding(getName(), getName()); + + List<String> results = waitAndFindMatches("BND-1001"); + + assertEquals("Unexpected number of bindings logged", 2, results.size()); + + String log = getLogMessage(results, 0); + + // Validate correct binding + String subject = fromSubject(log); + assertEquals("Incorrect queue named in create", getName(), AbstractTestLogSubject.getSlice("qu", subject)); + assertEquals("Incorrect routing key in create", getName(), AbstractTestLogSubject.getSlice("rk", subject)); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + } + + public void testBindingCreateOnTopicViaManagementConsole() throws IOException, JMException + { + //Remove any previous queue declares + _monitor.markDiscardPoint(); + + _jmxUtils.createQueue("test", getName(), null, false); + + ManagedExchange managedExchange = _jmxUtils.getManagedExchange("amq.topic"); + + managedExchange.createNewBinding(getName(), getName()); + + List<String> results = waitAndFindMatches("BND-1001"); + + assertEquals("Unexpected number of bindings logged", 2, results.size()); + + String log = getLogMessage(results, 0); + + // Validate correct binding + String subject = fromSubject(log); + assertEquals("Incorrect queue named in create", getName(), AbstractTestLogSubject.getSlice("qu", subject)); + assertEquals("Incorrect routing key in create", getName(), AbstractTestLogSubject.getSlice("rk", subject)); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + } + + public void testBindingCreateOnFanoutViaManagementConsole() throws IOException, JMException + { + //Remove any previous queue declares + _monitor.markDiscardPoint(); + + _jmxUtils.createQueue("test", getName(), null, false); + + ManagedExchange managedExchange = _jmxUtils.getManagedExchange("amq.fanout"); + + managedExchange.createNewBinding(getName(), getName()); + + List<String> results = waitAndFindMatches("BND-1001"); + + assertEquals("Unexpected number of bindings logged", 2, results.size()); + + String log = getLogMessage(results, 0); + + // Validate correct binding + String subject = fromSubject(log); + assertEquals("Incorrect queue named in create", getName(), AbstractTestLogSubject.getSlice("qu", subject)); + assertEquals("Incorrect routing key in create", getName(), AbstractTestLogSubject.getSlice("rk", subject)); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + } + + /** + * Description: + * Bindings can be deleted so that a queue can be rebound with a different set of values. This can be performed via the Management Console + * Input: + * + * 1. Running Broker + * 2. Management Console connected + * 3. Management Console is used to perform unbind. + * Output: + * + * <date> BND-1002 : Deleted + * + * Validation Steps: + * 4. The BND ID is correct + * 5. There must have been a BND-1001 Create message first. + * 6. This will be the last message for the given binding + * + * @throws java.io.IOException - if there is a problem reseting the log monitor or an issue with the JMX Connection + * @throws javax.management.JMException - {@see #createExchange and ManagedBroker.unregisterExchange} + */ + public void testUnRegisterExchangeViaManagementConsole() throws IOException, JMException + { + //Remove any previous queue declares + _monitor.markDiscardPoint(); + + _jmxUtils.createExchange("test", getName(), "direct", false); + + ManagedBroker managedBroker = _jmxUtils.getManagedBroker("test"); + + managedBroker.unregisterExchange(getName()); + + List<String> results = waitAndFindMatches("EXH-1002"); + + assertEquals("More than one exchange deletion found", 1, results.size()); + + String log = getLog(results.get(0)); + + // Validate correct binding + String subject = fromSubject(log); + assertEquals("Incorrect exchange named in delete", "direct/" + getName(), AbstractTestLogSubject.getSlice("ex", subject)); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + } + +} diff --git a/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/ManagementLoggingTest.java b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/ManagementLoggingTest.java new file mode 100644 index 0000000000..6100d5a23e --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/ManagementLoggingTest.java @@ -0,0 +1,317 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest.management.jmx; + + +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.logging.AbstractTestLogging; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.util.LogMonitor; + +import java.io.File; +import java.util.List; + +/** + * Management Console Test Suite + * + * The Management Console test suite validates that the follow log messages as specified in the Functional Specification. + * + * This suite of tests validate that the management console messages occur correctly and according to the following format: + * + * MNG-1001 : Startup + * MNG-1002 : Starting : <service> : Listening on port <Port> + * MNG-1003 : Shutting down : <service> : port <Port> + * MNG-1004 : Ready + * MNG-1005 : Stopped + * MNG-1006 : Using SSL Keystore : <path> + * MNG-1007 : Open : User <username> + * MNG-1008 : Close : User <username> + */ +public class ManagementLoggingTest extends AbstractTestLogging +{ + private static final String MNG_PREFIX = "MNG-"; + + public void setUp() throws Exception + { + setLogMessagePrefix(); + + // We either do this here or have a null check in tearDown. + // As when this test is run against profiles other than java it will NPE + _monitor = new LogMonitor(_outputFile); + //We explicitly do not call super.setUp as starting up the broker is + //part of the test case. + + } + + /** + * Description: + * Using the startup configuration validate that the management startup + * message is logged correctly. + * Input: + * Standard configuration with management enabled + * Output: + * + * <date> MNG-1001 : Startup + * + * Constraints: + * This is the FIRST message logged by MNG + * Validation Steps: + * + * 1. The BRK ID is correct + * 2. This is the FIRST message logged by MNG + */ + public void testManagementStartupEnabled() throws Exception + { + // This test only works on java brokers + if (isJavaBroker()) + { + startBrokerAndCreateMonitor(true, false); + + // Ensure we have received the MNG log msg. + waitForMessage("MNG-1001"); + + List<String> results = findMatches(MNG_PREFIX); + // Validation + + assertTrue("MNGer message not logged", results.size() > 0); + + String log = getLogMessage(results, 0); + + //1 + validateMessageID("MNG-1001", log); + + //2 + //There will be 2 copies of the startup message (one via SystemOut, and one via Log4J) + results = findMatches("MNG-1001"); + assertEquals("Unexpected startup message count.", + 2, results.size()); + + //3 + assertEquals("Startup log message is not 'Startup'.", "Startup", + getMessageString(log)); + } + } + + /** + * Description: + * Verify that when management is disabled in the configuration file the + * startup message is not logged. + * Input: + * Standard configuration with management disabled + * Output: + * NO MNG messages + * Validation Steps: + * + * 1. Validate that no MNG messages are produced. + */ + public void testManagementStartupDisabled() throws Exception + { + if (isJavaBroker()) + { + startBrokerAndCreateMonitor(false, false); + + List<String> results = findMatches(MNG_PREFIX); + // Validation + + assertEquals("MNGer messages logged", 0, results.size()); + } + } + + /** + * The two MNG-1002 messages are logged at the same time so lets test them + * at the same time. + * + * Description: + * Using the default configuration validate that the RMI Registry socket is + * correctly reported as being opened + * + * Input: + * The default configuration file + * Output: + * + * <date> MESSAGE MNG-1002 : Starting : RMI Registry : Listening on port 8999 + * + * Constraints: + * The RMI ConnectorServer and Registry log messages do not have a prescribed order + * Validation Steps: + * + * 1. The MNG ID is correct + * 2. The specified port is the correct '8999' + * + * Description: + * Using the default configuration validate that the RMI ConnectorServer + * socket is correctly reported as being opened + * + * Input: + * The default configuration file + * Output: + * + * <date> MESSAGE MNG-1002 : Starting : RMI ConnectorServer : Listening on port 9099 + * + * Constraints: + * The RMI ConnectorServer and Registry log messages do not have a prescribed order + * Validation Steps: + * + * 1. The MNG ID is correct + * 2. The specified port is the correct '9099' + */ + public void testManagementStartupRMIEntries() throws Exception + { + if (isJavaBroker()) + { + startBrokerAndCreateMonitor(true, false); + + List<String> results = waitAndFindMatches("MNG-1002"); + // Validation + + //There will be 4 startup messages (two via SystemOut, and two via Log4J) + assertEquals("Unexpected MNG-1002 message count", 4, results.size()); + + String log = getLogMessage(results, 0); + + //1 + validateMessageID("MNG-1002", log); + + //Check the RMI Registry port is as expected + int mPort = getManagementPort(getPort()); + assertTrue("RMI Registry port not as expected(" + mPort + ").:" + getMessageString(log), + getMessageString(log).endsWith(String.valueOf(mPort))); + + log = getLogMessage(results, 2); + + //1 + validateMessageID("MNG-1002", log); + + // We expect the RMI Registry port (the defined 'management port') to be + // 100 lower than the JMX RMIConnector Server Port (the actual JMX server) + int jmxPort = mPort + ServerConfiguration.JMXPORT_CONNECTORSERVER_OFFSET; + assertTrue("JMX RMIConnectorServer port not as expected(" + jmxPort + ").:" + getMessageString(log), + getMessageString(log).endsWith(String.valueOf(jmxPort))); + } + } + + /** + * Description: + * Using the default configuration with SSL enabled for the management port the SSL Keystore path should be reported via MNG-1006 + * Input: + * Management SSL enabled default configuration. + * Output: + * + * <date> MESSAGE MNG-1006 : Using SSL Keystore : test_resources/ssl/keystore.jks + * + * Validation Steps: + * + * 1. The MNG ID is correct + * 2. The keystore path is as specified in the configuration + */ + public void testManagementStartupSSLKeystore() throws Exception + { + if (isJavaBroker()) + { + startBrokerAndCreateMonitor(true, true); + + List<String> results = waitAndFindMatches("MNG-1006"); + + assertTrue("MNGer message not logged", results.size() > 0); + + String log = getLogMessage(results, 0); + + //1 + validateMessageID("MNG-1006", log); + + // Validate we only have two MNG-1002 (one via stdout, one via log4j) + results = findMatches("MNG-1006"); + assertEquals("Upexpected SSL Keystore message count", + 2, results.size()); + + // Validate the keystore path is as expected + assertTrue("SSL Keystore entry expected.:" + getMessageString(log), + getMessageString(log).endsWith(new File(getConfigurationStringProperty("management.ssl.keyStorePath")).getName())); + } + } + + /** + * Description: Tests the management connection open/close are logged correctly. + * + * Output: + * + * <date> MESSAGE MNG-1007 : Open : User <username> + * <date> MESSAGE MNG-1008 : Close : User <username> + * + * Validation Steps: + * + * 1. The MNG ID is correct + * 2. The message and username are correct + */ + public void testManagementUserOpenClose() throws Exception + { + if (isJavaBroker()) + { + startBrokerAndCreateMonitor(true, false); + + final JMXTestUtils jmxUtils = new JMXTestUtils(this); + List<String> openResults = null; + List<String> closeResults = null; + try + { + jmxUtils.setUp(); + jmxUtils.open(); + openResults = waitAndFindMatches("MNG-1007"); + } + finally + { + if (jmxUtils != null) + { + jmxUtils.close(); + closeResults = waitAndFindMatches("MNG-1008"); + } + } + + assertNotNull("Management Open results null", openResults.size()); + assertEquals("Management Open logged unexpected number of times", 1, openResults.size()); + + assertNotNull("Management Close results null", closeResults.size()); + assertEquals("Management Close logged unexpected number of times", 1, closeResults.size()); + + final String openMessage = getMessageString(getLogMessage(openResults, 0)); + assertTrue("Unexpected open message " + openMessage, openMessage.endsWith("Open : User admin")); + final String closeMessage = getMessageString(getLogMessage(closeResults, 0)); + assertTrue("Unexpected close message " + closeMessage, closeMessage.endsWith("Close : User admin")); + } + } + + private void startBrokerAndCreateMonitor(boolean managementEnabled, boolean useManagementSSL) throws Exception + { + //Ensure management is on + setConfigurationProperty("management.enabled", String.valueOf(managementEnabled)); + + if(useManagementSSL) + { + // This test requires we have an ssl connection + setConfigurationProperty("management.ssl.enabled", "true"); + } + + startBroker(); + + // Now we can create the monitor as _outputFile will now be defined + _monitor = new LogMonitor(_outputFile); + } +} diff --git a/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java new file mode 100644 index 0000000000..ad6777d0ea --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java @@ -0,0 +1,609 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.systest.management.jmx; + +import org.apache.commons.lang.time.FastDateFormat; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.NotificationCheckTest; +import org.apache.qpid.server.queue.SimpleAMQQueueTest; +import org.apache.qpid.test.client.destination.AddressBasedDestinationTest; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Session; +import javax.management.Notification; +import javax.management.NotificationListener; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests the JMX API for the Managed Queue. + * + */ +public class QueueManagementTest extends QpidBrokerTestCase +{ + + private static final Logger LOGGER = Logger.getLogger(QueueManagementTest.class); + + private static final String VIRTUAL_HOST = "test"; + private static final String TEST_QUEUE_DESCRIPTION = "my description"; + + private JMXTestUtils _jmxUtils; + private Connection _connection; + private Session _session; + + private String _sourceQueueName; + private String _destinationQueueName; + private Destination _sourceQueue; + private Destination _destinationQueue; + private ManagedQueue _managedSourceQueue; + private ManagedQueue _managedDestinationQueue; + + + public void setUp() throws Exception + { + _jmxUtils = new JMXTestUtils(this); + _jmxUtils.setUp(); + + super.setUp(); + _sourceQueueName = getTestQueueName() + "_src"; + _destinationQueueName = getTestQueueName() + "_dest"; + + _connection = getConnection(); + _connection.start(); + + _session = _connection.createSession(true, Session.SESSION_TRANSACTED); + _sourceQueue = _session.createQueue(_sourceQueueName); + _destinationQueue = _session.createQueue(_destinationQueueName); + createQueueOnBroker(_sourceQueue); + createQueueOnBroker(_destinationQueue); + + _jmxUtils.open(); + + _managedSourceQueue = _jmxUtils.getManagedQueue(_sourceQueueName); + _managedDestinationQueue = _jmxUtils.getManagedQueue(_destinationQueueName); + } + + public void tearDown() throws Exception + { + if (_jmxUtils != null) + { + _jmxUtils.close(); + } + super.tearDown(); + } + + public void testQueueAttributes() throws Exception + { + Queue queue = _session.createQueue(getTestQueueName()); + createQueueOnBroker(queue); + + final String queueName = queue.getQueueName(); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertEquals("Unexpected name", queueName, managedQueue.getName()); + assertEquals("Unexpected queue type", "standard", managedQueue.getQueueType()); + } + + public void testExclusiveQueueHasJmsClientIdAsOwner() throws Exception + { + Queue tmpQueue = _session.createTemporaryQueue(); + + final String queueName = tmpQueue.getQueueName(); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertNotNull(_connection.getClientID()); + assertEquals("Unexpected owner", _connection.getClientID(), managedQueue.getOwner()); + } + + public void testNonExclusiveQueueHasNoOwner() throws Exception + { + Queue nonExclusiveQueue = _session.createQueue(getTestQueueName()); + createQueueOnBroker(nonExclusiveQueue); + + final String queueName = nonExclusiveQueue.getQueueName(); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertNull("Unexpected owner", managedQueue.getOwner()); + } + + public void testSetNewQueueDescriptionOnExistingQueue() throws Exception + { + Queue queue = _session.createQueue(getTestQueueName()); + createQueueOnBroker(queue); + + final String queueName = queue.getQueueName(); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertNull("Unexpected description", managedQueue.getDescription()); + + managedQueue.setDescription(TEST_QUEUE_DESCRIPTION); + assertEquals(TEST_QUEUE_DESCRIPTION, managedQueue.getDescription()); + } + + public void testNewQueueWithDescription() throws Exception + { + String queueName = getTestQueueName(); + Map<String, Object> arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_QUEUE_DESCRIPTION); + ((AMQSession<?, ?>)_session).createQueue(AMQShortString.valueOf(queueName), false, true, false, arguments); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertEquals(TEST_QUEUE_DESCRIPTION, managedQueue.getDescription()); + } + + /** + * Requires persistent store. + */ + public void testQueueDescriptionSurvivesRestart() throws Exception + { + String queueName = getTestQueueName(); + Map<String, Object> arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_QUEUE_DESCRIPTION); + + ((AMQSession<?, ?>)_session).createQueue(AMQShortString.valueOf(queueName), false, true, false, arguments); + + ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertEquals(TEST_QUEUE_DESCRIPTION, managedQueue.getDescription()); + + restartBroker(); + + managedQueue = _jmxUtils.getManagedQueue(queueName); + assertEquals(TEST_QUEUE_DESCRIPTION, managedQueue.getDescription()); + } + + /** + * Tests queue creation with {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument. Also tests + * that the attribute is exposed correctly through {@link ManagedQueue#getMaximumDeliveryCount()}. + */ + public void testCreateQueueWithMaximumDeliveryCountSet() throws Exception + { + final String queueName = getName(); + final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); + + final Integer deliveryCount = 1; + final Map<String, Object> arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, (Object)deliveryCount); + managedBroker.createNewQueue(queueName, null, true, arguments); + + // Ensure the queue exists + assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName("test", queueName)); + assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName)); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertEquals("Unexpected maximum delivery count", deliveryCount, managedQueue.getMaximumDeliveryCount()); + } + + /** + * Requires 0-10 as relies on ADDR addresses. + * @see AddressBasedDestinationTest for the testing of message routing to the alternate exchange + */ + public void testGetSetAlternateExchange() throws Exception + { + String queueName = getTestQueueName(); + String altExchange = "amq.fanout"; + String addrWithAltExch = String.format("ADDR:%s;{create:always,node:{type:queue,x-declare:{alternate-exchange:'%s'}}}", queueName, altExchange); + Queue queue = _session.createQueue(addrWithAltExch); + + createQueueOnBroker(queue); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertEquals("Newly created queue does not have expected alternate exchange", altExchange, managedQueue.getAlternateExchange()); + + String newAltExch = "amq.topic"; + managedQueue.setAlternateExchange(newAltExch); + assertEquals("Unexpected alternate exchange after set", newAltExch, managedQueue.getAlternateExchange()); + } + + /** + * Requires 0-10 as relies on ADDR addresses. + */ + public void testRemoveAlternateExchange() throws Exception + { + String queueName = getTestQueueName(); + String altExchange = "amq.fanout"; + String addrWithAltExch = String.format("ADDR:%s;{create:always,node:{type:queue,x-declare:{alternate-exchange:'%s'}}}", queueName, altExchange); + Queue queue = _session.createQueue(addrWithAltExch); + + createQueueOnBroker(queue); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertEquals("Newly created queue does not have expected alternate exchange", altExchange, managedQueue.getAlternateExchange()); + + managedQueue.setAlternateExchange(""); + assertNull("Unexpected alternate exchange after set", managedQueue.getAlternateExchange()); + } + + /** + * Requires persistent store + * Requires 0-10 as relies on ADDR addresses. + */ + public void testAlternateExchangeSurvivesRestart() throws Exception + { + String queueName1 = getTestQueueName() + "1"; + String altExchange1 = "amq.fanout"; + String addr1WithAltExch = String.format("ADDR:%s;{create:always,node:{durable: true,type:queue,x-declare:{alternate-exchange:'%s'}}}", queueName1, altExchange1); + Queue queue1 = _session.createQueue(addr1WithAltExch); + + String queueName2 = getTestQueueName() + "2"; + String addr2WithoutAltExch = String.format("ADDR:%s;{create:always,node:{durable: true,type:queue,}}", queueName2); + Queue queue2 = _session.createQueue(addr2WithoutAltExch); + + createQueueOnBroker(queue1); + createQueueOnBroker(queue2); + + ManagedQueue managedQueue1 = _jmxUtils.getManagedQueue(queueName1); + assertEquals("Newly created queue1 does not have expected alternate exchange", altExchange1, managedQueue1.getAlternateExchange()); + + ManagedQueue managedQueue2 = _jmxUtils.getManagedQueue(queueName2); + assertNull("Newly created queue2 does not have expected alternate exchange", managedQueue2.getAlternateExchange()); + + String altExchange2 = "amq.fanout"; + managedQueue2.setAlternateExchange(altExchange2); + + restartBroker(); + + managedQueue1 = _jmxUtils.getManagedQueue(queueName1); + assertEquals("Queue1 does not have expected alternate exchange after restart", altExchange1, managedQueue1.getAlternateExchange()); + + managedQueue2 = _jmxUtils.getManagedQueue(queueName2); + assertEquals("Queue2 does not have expected updated alternate exchange after restart", altExchange2, managedQueue2.getAlternateExchange()); + } + + /** + * Tests the ability to receive queue alerts as JMX notifications. + * + * @see NotificationCheckTest + * @see SimpleAMQQueueTest#testNotificationFiredAsync() + * @see SimpleAMQQueueTest#testNotificationFiredOnEnqueue() + */ + public void testQueueNotification() throws Exception + { + final String queueName = getName(); + final long maximumMessageCount = 3; + + Queue queue = _session.createQueue(queueName); + createQueueOnBroker(queue); + + ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + managedQueue.setMaximumMessageCount(maximumMessageCount); + + RecordingNotificationListener listener = new RecordingNotificationListener(1); + + _jmxUtils.addNotificationListener(_jmxUtils.getQueueObjectName(VIRTUAL_HOST, queueName), listener, null, null); + + // Send two messages - this should *not* trigger the notification + sendMessage(_session, queue, 2); + + assertEquals("Premature notification received", 0, listener.getNumberOfNotificationsReceived()); + + // A further message should trigger the message count alert + sendMessage(_session, queue, 1); + + listener.awaitExpectedNotifications(5, TimeUnit.SECONDS); + + assertEquals("Unexpected number of JMX notifications received", 1, listener.getNumberOfNotificationsReceived()); + + Notification notification = listener.getLastNotification(); + assertEquals("Unexpected notification message", "MESSAGE_COUNT_ALERT 3: Maximum count on queue threshold (3) breached.", notification.getMessage()); + } + + /** + * Tests {@link ManagedQueue#viewMessages(long, long)} interface. + */ + public void testViewSingleMessage() throws Exception + { + final List<Message> sentMessages = sendMessage(_session, _sourceQueue, 1); + syncSession(_session); + final Message sentMessage = sentMessages.get(0); + + assertEquals("Unexpected queue depth", 1, _managedSourceQueue.getMessageCount().intValue()); + + // Check the contents of the message + final TabularData tab = _managedSourceQueue.viewMessages(1l, 1l); + assertEquals("Unexpected number of rows in table", 1, tab.size()); + final Iterator<CompositeData> rowItr = (Iterator<CompositeData>) tab.values().iterator(); + + final CompositeData row1 = rowItr.next(); + assertNotNull("Message should have AMQ message id", row1.get(ManagedQueue.MSG_AMQ_ID)); + assertEquals("Unexpected queue position", 1l, row1.get(ManagedQueue.MSG_QUEUE_POS)); + assertEquals("Unexpected redelivered flag", Boolean.FALSE, row1.get(ManagedQueue.MSG_REDELIVERED)); + + // Check the contents of header (encoded in a string array) + final String[] headerArray = (String[]) row1.get(ManagedQueue.MSG_HEADER); + assertNotNull("Expected message header array", headerArray); + final Map<String, String> headers = headerArrayToMap(headerArray); + + final String expectedJMSMessageID = isBroker010() ? sentMessage.getJMSMessageID().replace("ID:", "") : sentMessage.getJMSMessageID(); + final String expectedFormattedJMSTimestamp = FastDateFormat.getInstance(ManagedQueue.JMSTIMESTAMP_DATETIME_FORMAT).format(sentMessage.getJMSTimestamp()); + assertEquals("Unexpected JMSMessageID within header", expectedJMSMessageID, headers.get("JMSMessageID")); + assertEquals("Unexpected JMSPriority within header", String.valueOf(sentMessage.getJMSPriority()), headers.get("JMSPriority")); + assertEquals("Unexpected JMSTimestamp within header", expectedFormattedJMSTimestamp, headers.get("JMSTimestamp")); + } + + /** + * Tests {@link ManagedQueue#moveMessages(long, long, String)} interface. + */ + public void testMoveMessagesBetweenQueues() throws Exception + { + final int numberOfMessagesToSend = 10; + + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + // Move first three messages to destination + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(2); + _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); + + assertEquals("Unexpected queue depth on destination queue after first move", 3, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after first move", 7, _managedSourceQueue.getMessageCount().intValue()); + + // Now move a further two messages to destination + fromMessageId = amqMessagesIds.get(7); + toMessageId = amqMessagesIds.get(8); + _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); + assertEquals("Unexpected queue depth on destination queue after second move", 5, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after second move", 5, _managedSourceQueue.getMessageCount().intValue()); + + assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8); + } + + /** + * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface. + */ + public void testCopyMessagesBetweenQueues() throws Exception + { + final int numberOfMessagesToSend = 10; + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + // Copy first three messages to destination + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(2); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + + assertEquals("Unexpected queue depth on destination queue after first copy", 3, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after first copy", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + // Now copy a further two messages to destination + fromMessageId = amqMessagesIds.get(7); + toMessageId = amqMessagesIds.get(8); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + assertEquals("Unexpected queue depth on destination queue after second copy", 5, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after second copy", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8); + } + + public void testMoveMessagesBetweenQueuesWithActiveConsumerOnSourceQueue() throws Exception + { + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); + Connection asyncConnection = getConnection(); + asyncConnection.start(); + + final int numberOfMessagesToSend = 50; + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(numberOfMessagesToSend - 1); + + CountDownLatch consumerReadToHalfwayLatch = new CountDownLatch(numberOfMessagesToSend / 2); + AtomicInteger totalConsumed = new AtomicInteger(0); + startAsyncConsumerOn(_sourceQueue, asyncConnection, consumerReadToHalfwayLatch, totalConsumed); + + boolean halfwayPointReached = consumerReadToHalfwayLatch.await(5000, TimeUnit.MILLISECONDS); + assertTrue("Did not read half of messages within time allowed", halfwayPointReached); + + _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); + + asyncConnection.stop(); + + // The exact number of messages moved will be non deterministic, as the number of messages processed + // by the consumer cannot be predicted. There is also the possibility that a message can remain + // on the source queue. This situation will arise if a message has been acquired by the consumer, but not + // yet delivered to the client application (i.e. MessageListener#onMessage()) when the Connection#stop() occurs. + // + // The number of messages moved + the number consumed + any messages remaining on source should + // *always* be equal to the number we originally sent. + + int numberOfMessagesReadByConsumer = totalConsumed.intValue(); + int numberOfMessagesOnDestinationQueue = _managedDestinationQueue.getMessageCount().intValue(); + int numberOfMessagesRemainingOnSourceQueue = _managedSourceQueue.getMessageCount().intValue(); + + LOGGER.debug("Async consumer read : " + numberOfMessagesReadByConsumer + + " Number of messages moved to destination : " + numberOfMessagesOnDestinationQueue + + " Number of messages remaining on source : " + numberOfMessagesRemainingOnSourceQueue); + assertEquals("Unexpected number of messages after move", numberOfMessagesToSend, numberOfMessagesReadByConsumer + numberOfMessagesOnDestinationQueue + numberOfMessagesRemainingOnSourceQueue); + } + + public void testMoveMessagesBetweenQueuesWithActiveConsumerOnDestinationQueue() throws Exception + { + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); + Connection asyncConnection = getConnection(); + asyncConnection.start(); + + final int numberOfMessagesToSend = 50; + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(numberOfMessagesToSend - 1); + + AtomicInteger totalConsumed = new AtomicInteger(0); + CountDownLatch allMessagesConsumedLatch = new CountDownLatch(numberOfMessagesToSend); + startAsyncConsumerOn(_destinationQueue, asyncConnection, allMessagesConsumedLatch, totalConsumed); + + _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); + + allMessagesConsumedLatch.await(5000, TimeUnit.MILLISECONDS); + assertEquals("Did not consume all messages from destination queue", numberOfMessagesToSend, totalConsumed.intValue()); + } + + private void startAsyncConsumerOn(Destination queue, Connection asyncConnection, + final CountDownLatch requiredNumberOfMessagesRead, final AtomicInteger totalConsumed) throws Exception + { + Session session = asyncConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + consumer.setMessageListener(new MessageListener() + { + + @Override + public void onMessage(Message arg0) + { + totalConsumed.incrementAndGet(); + requiredNumberOfMessagesRead.countDown(); + } + }); + } + + private void assertMessageIndicesOn(Destination queue, int... expectedIndices) throws Exception + { + MessageConsumer consumer = _session.createConsumer(queue); + + for (int i : expectedIndices) + { + Message message = consumer.receive(1000); + assertNotNull("Expected message with index " + i, message); + assertEquals("Expected message with index " + i, i, message.getIntProperty(INDEX)); + } + + assertNull("Unexpected message encountered", consumer.receive(1000)); + } + + private List<Long> getAMQMessageIdsOn(ManagedQueue managedQueue, long startIndex, long endIndex) throws Exception + { + final SortedSet<Long> messageIds = new TreeSet<Long>(); + + final TabularData tab = managedQueue.viewMessages(startIndex, endIndex); + final Iterator<CompositeData> rowItr = (Iterator<CompositeData>) tab.values().iterator(); + while(rowItr.hasNext()) + { + final CompositeData row = rowItr.next(); + long amqMessageId = (Long)row.get(ManagedQueue.MSG_AMQ_ID); + messageIds.add(amqMessageId); + } + + return new ArrayList<Long>(messageIds); + } + + /** + * + * Utility method to convert array of Strings in the form x = y into a + * map with key/value x => y. + * + */ + private Map<String,String> headerArrayToMap(final String[] headerArray) + { + final Map<String, String> headerMap = new HashMap<String, String>(); + final List<String> headerList = Arrays.asList(headerArray); + for (Iterator<String> iterator = headerList.iterator(); iterator.hasNext();) + { + final String nameValuePair = iterator.next(); + final String[] nameValue = nameValuePair.split(" *= *", 2); + headerMap.put(nameValue[0], nameValue[1]); + } + return headerMap; + } + + private void createQueueOnBroker(Destination destination) throws JMSException + { + _session.createConsumer(destination).close(); // Create a consumer only to cause queue creation + } + + private void syncSession(Session session) throws Exception + { + ((AMQSession<?,?>)session).sync(); + } + + private final class RecordingNotificationListener implements NotificationListener + { + private final CountDownLatch _notificationReceivedLatch; + private final AtomicInteger _numberOfNotifications; + private final AtomicReference<Notification> _lastNotification; + + private RecordingNotificationListener(int expectedNumberOfNotifications) + { + _notificationReceivedLatch = new CountDownLatch(expectedNumberOfNotifications); + _numberOfNotifications = new AtomicInteger(0); + _lastNotification = new AtomicReference<Notification>(); + } + + @Override + public void handleNotification(Notification notification, Object handback) + { + _lastNotification.set(notification); + _numberOfNotifications.incrementAndGet(); + _notificationReceivedLatch.countDown(); + } + + public int getNumberOfNotificationsReceived() + { + return _numberOfNotifications.get(); + } + + public Notification getLastNotification() + { + return _lastNotification.get(); + } + + public void awaitExpectedNotifications(long timeout, TimeUnit timeunit) throws InterruptedException + { + _notificationReceivedLatch.await(timeout, timeunit); + } + } + +} diff --git a/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/StatisticsTest.java b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/StatisticsTest.java new file mode 100644 index 0000000000..c3fff94923 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/StatisticsTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest.management.jmx; + +import java.util.List; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedConnection; +import org.apache.qpid.management.common.mbeans.ServerInformation; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class StatisticsTest extends QpidBrokerTestCase +{ + private static final String TEST_USER = "admin"; + private static final String TEST_PASSWORD = "admin"; + private static final int MESSAGE_COUNT_TEST = 5; + private static final int MESSAGE_COUNT_DEV = 9; + + private JMXTestUtils _jmxUtils; + private Connection _test1, _dev; + private Session _testSession, _developmentSession; + private Queue _developmentQueue, _testQueue; + protected String _brokerUrl; + + @Override + public void setUp() throws Exception + { + _jmxUtils = new JMXTestUtils(this, TEST_USER, TEST_PASSWORD); + _jmxUtils.setUp(); + + super.setUp(); + + _brokerUrl = getBroker().toString(); + _test1 = new AMQConnection(_brokerUrl, TEST_USER, TEST_PASSWORD, "clientid", "test"); + _dev = new AMQConnection(_brokerUrl, TEST_USER, TEST_PASSWORD, "clientid", "development"); + _test1.start(); + _dev.start(); + + _testSession = _test1.createSession(true, Session.SESSION_TRANSACTED); + _developmentSession = _dev.createSession(true, Session.SESSION_TRANSACTED); + + _developmentQueue = _developmentSession.createQueue(getTestQueueName()); + _testQueue = _testSession.createQueue(getTestQueueName()); + + //Create queues by opening and closing consumers + final MessageConsumer testConsumer = _testSession.createConsumer(_testQueue); + testConsumer.close(); + final MessageConsumer developmentConsumer = _developmentSession.createConsumer(_developmentQueue); + developmentConsumer.close(); + + _jmxUtils.open(); + } + + @Override + public void tearDown() throws Exception + { + _jmxUtils.close(); + + super.tearDown(); + } + + public void testInitialStatisticValues() throws Exception + { + //Check initial values + checkSingleConnectionOnVHostStatistics("test", 0, 0, 0, 0); + checkVHostStatistics("test", 0, 0, 0, 0); + checkSingleConnectionOnVHostStatistics("development", 0, 0, 0, 0); + checkVHostStatistics("development", 0, 0, 0, 0); + checkBrokerStatistics(0, 0, 0, 0); + } + + public void testSendOnSingleVHost() throws Exception + { + sendMessagesAndSync(_testSession, _testQueue, MESSAGE_COUNT_TEST); + + //Check values + checkSingleConnectionOnVHostStatistics("test", MESSAGE_COUNT_TEST, 0, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, 0); + checkVHostStatistics("test", MESSAGE_COUNT_TEST, 0, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, 0); + checkSingleConnectionOnVHostStatistics("development", 0, 0, 0, 0); + checkVHostStatistics("development", 0, 0, 0, 0); + checkBrokerStatistics(MESSAGE_COUNT_TEST, 0, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, 0); + } + + public void testSendOnTwoVHosts() throws Exception + { + sendMessagesAndSync(_testSession, _testQueue, MESSAGE_COUNT_TEST); + sendMessagesAndSync(_developmentSession, _developmentQueue, MESSAGE_COUNT_DEV); + + //Check values + checkSingleConnectionOnVHostStatistics("test", MESSAGE_COUNT_TEST, 0, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, 0); + checkVHostStatistics("test", MESSAGE_COUNT_TEST, 0, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, 0); + checkSingleConnectionOnVHostStatistics("development", MESSAGE_COUNT_DEV, 0, MESSAGE_COUNT_DEV * DEFAULT_MESSAGE_SIZE, 0); + checkVHostStatistics("development", MESSAGE_COUNT_DEV, 0, MESSAGE_COUNT_DEV * DEFAULT_MESSAGE_SIZE, 0); + checkBrokerStatistics(MESSAGE_COUNT_TEST + MESSAGE_COUNT_DEV, 0, (MESSAGE_COUNT_TEST + MESSAGE_COUNT_DEV) * DEFAULT_MESSAGE_SIZE, 0); + } + + public void testSendAndConsumeOnSingleVHost() throws Exception + { + sendMessagesAndSync(_testSession, _testQueue, MESSAGE_COUNT_TEST); + consumeMessages(_testSession, _testQueue, MESSAGE_COUNT_TEST); + + //Check values + checkSingleConnectionOnVHostStatistics("test", MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE); + checkVHostStatistics("test", MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE); + checkSingleConnectionOnVHostStatistics("development", 0, 0, 0, 0); + checkVHostStatistics("development", 0, 0, 0, 0); + checkBrokerStatistics(MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE); + } + + public void testSendAndConsumeOnTwoVHosts() throws Exception + { + sendMessagesAndSync(_testSession, _testQueue, MESSAGE_COUNT_TEST); + sendMessagesAndSync(_developmentSession, _developmentQueue, MESSAGE_COUNT_DEV); + consumeMessages(_testSession, _testQueue, MESSAGE_COUNT_TEST); + consumeMessages(_developmentSession, _developmentQueue, MESSAGE_COUNT_DEV); + + //Check values + checkSingleConnectionOnVHostStatistics("test", MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE); + checkVHostStatistics("test", MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE); + checkSingleConnectionOnVHostStatistics("development", MESSAGE_COUNT_DEV, MESSAGE_COUNT_DEV, MESSAGE_COUNT_DEV * DEFAULT_MESSAGE_SIZE, MESSAGE_COUNT_DEV * DEFAULT_MESSAGE_SIZE); + checkVHostStatistics("development", MESSAGE_COUNT_DEV, MESSAGE_COUNT_DEV, MESSAGE_COUNT_DEV * DEFAULT_MESSAGE_SIZE, MESSAGE_COUNT_DEV * DEFAULT_MESSAGE_SIZE); + checkBrokerStatistics(MESSAGE_COUNT_TEST + MESSAGE_COUNT_DEV, MESSAGE_COUNT_TEST + MESSAGE_COUNT_DEV, (MESSAGE_COUNT_TEST + MESSAGE_COUNT_DEV) * DEFAULT_MESSAGE_SIZE, (MESSAGE_COUNT_TEST + MESSAGE_COUNT_DEV) * DEFAULT_MESSAGE_SIZE); + } + + private void sendMessagesAndSync(Session session, Queue queue, int numberOfMessages) throws Exception + { + //Send messages via connection on and sync + sendMessage(session, queue, numberOfMessages); + ((AMQSession<?,?>)session).sync(); + } + + private void consumeMessages(Session session, Queue queue, int numberOfMessages) throws Exception + { + //consume the messages on the virtual host + final MessageConsumer consumer = session.createConsumer(queue); + for (int i = 0 ; i < numberOfMessages ; i++) + { + assertNotNull("an expected message was not received", consumer.receive(1500)); + } + session.commit(); + consumer.close(); + } + + private void checkSingleConnectionOnVHostStatistics(String vHostName, long messagesSent, long messagesReceived, long dataSent, long dataReceived) + { + List<ManagedConnection> managedConnections = _jmxUtils.getManagedConnections(vHostName); + assertEquals(1, managedConnections.size()); + + ManagedConnection managedConnection = managedConnections.get(0); + + assertEquals(messagesSent, managedConnection.getTotalMessagesReceived()); + assertEquals(messagesReceived, managedConnection.getTotalMessagesDelivered()); + + assertEquals(dataSent, managedConnection.getTotalDataReceived()); + assertEquals(dataReceived, managedConnection.getTotalDataDelivered()); + } + + private void checkVHostStatistics(String vHostName, long messagesSent, long messagesReceived, long dataSent, long dataReceived) + { + ManagedBroker vhost = _jmxUtils.getManagedBroker(vHostName); + + assertEquals(messagesSent, vhost.getTotalMessagesReceived()); + assertEquals(messagesReceived, vhost.getTotalMessagesDelivered()); + + assertEquals(dataSent, vhost.getTotalDataReceived()); + assertEquals(dataReceived, vhost.getTotalDataDelivered()); + } + + private void checkBrokerStatistics(long messagesSent, long messagesReceived, long dataSent, long dataReceived) + { + ServerInformation broker = _jmxUtils.getServerInformation(); + + assertEquals(messagesSent, broker.getTotalMessagesReceived()); + assertEquals(messagesReceived, broker.getTotalMessagesDelivered()); + + assertEquals(dataSent, broker.getTotalDataReceived()); + assertEquals(dataReceived, broker.getTotalDataDelivered()); + } +} diff --git a/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/UserManagementTest.java b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/UserManagementTest.java new file mode 100644 index 0000000000..62b1b554a9 --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/UserManagementTest.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.systest.management.jmx; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; + +import javax.jms.Connection; +import javax.jms.JMSException; + +import org.apache.qpid.management.common.mbeans.UserManagement; +import org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase; +import org.apache.qpid.server.security.auth.database.PrincipalDatabase; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.tools.security.Passwd; + +/** + * System test for User Management. + * + */ +public class UserManagementTest extends QpidBrokerTestCase +{ + private static final String TEST_NEWPASSWORD = "newpassword"; + private static final String TEST_PASSWORD = "password"; + private JMXTestUtils _jmxUtils; + private String _testUserName; + private File _passwordFile; + private UserManagement _userManagement; + private Passwd _passwd; + + public void setUp() throws Exception + { + _passwd = createPasswordEncodingUtility(); + _passwordFile = createTemporaryPasswordFileWithJmxAdminUser(); + + setConfigurationProperty("security.pd-auth-manager.principal-database.class", getPrincipalDatabaseImplClass().getName()); + setConfigurationProperty("security.pd-auth-manager.principal-database.attributes.attribute.name", "passwordFile"); + setConfigurationProperty("security.pd-auth-manager.principal-database.attributes.attribute.value", _passwordFile.getAbsolutePath()); + + _jmxUtils = new JMXTestUtils(this); + _jmxUtils.setUp(); + + super.setUp(); + _jmxUtils.open(); + + _testUserName = getTestName() + System.currentTimeMillis(); + + _userManagement = _jmxUtils.getUserManagement(); + } + + + public void tearDown() throws Exception + { + try + { + if (_jmxUtils != null) + { + _jmxUtils.close(); + } + } + finally + { + super.tearDown(); + } + } + + public void testCreateUser() throws Exception + { + final int initialNumberOfUsers = _userManagement.viewUsers().size(); + assertFileDoesNotContainsPasswordForUser(_testUserName); + + boolean success = _userManagement.createUser(_testUserName, TEST_PASSWORD); + assertTrue("Should have been able to create new user " + _testUserName, success); + assertEquals("Unexpected number of users after add", initialNumberOfUsers + 1, _userManagement.viewUsers().size()); + + assertFileContainsPasswordForUser(_testUserName); + } + + public void testJmsLoginForNewUser() throws Exception + { + assertJmsConnectionFails(_testUserName, TEST_PASSWORD); + testCreateUser(); + + assertJmsConnectionSucceeds(_testUserName, TEST_PASSWORD); + } + + public void testDeleteUser() throws Exception + { + final int initialNumberOfUsers = _userManagement.viewUsers().size(); + + testCreateUser(); + + boolean success = _userManagement.deleteUser(_testUserName); + assertTrue("Should have been able to delete new user " + _testUserName, success); + assertEquals("Unexpected number of users after delete", initialNumberOfUsers, _userManagement.viewUsers().size()); + assertFileDoesNotContainsPasswordForUser(_testUserName); + } + + public void testJmsLoginNotPossibleForDeletedUser() throws Exception + { + testDeleteUser(); + + assertJmsConnectionFails(_testUserName, TEST_PASSWORD); + } + + public void testSetPassword() throws Exception + { + testCreateUser(); + + _userManagement.setPassword(_testUserName, TEST_NEWPASSWORD); + + assertFileContainsPasswordForUser(_testUserName); + } + + public void testJmsLoginForPasswordChangedUser() throws Exception + { + testSetPassword(); + + assertJmsConnectionSucceeds(_testUserName, TEST_NEWPASSWORD); + assertJmsConnectionFails(_testUserName, TEST_PASSWORD); + } + + public void testReload() throws Exception + { + writePasswordFile(_passwordFile, JMXTestUtils.DEFAULT_USERID, JMXTestUtils.DEFAULT_PASSWORD, _testUserName, TEST_PASSWORD); + + assertJmsConnectionFails(_testUserName, TEST_PASSWORD); + + _userManagement.reloadData(); + + assertJmsConnectionSucceeds(_testUserName, TEST_PASSWORD); + } + + protected Passwd createPasswordEncodingUtility() + { + return new Passwd() + { + @Override + public String getOutput(String username, String password) + { + return username + ":" + password; + } + }; + } + + protected Class<? extends PrincipalDatabase> getPrincipalDatabaseImplClass() + { + return PlainPasswordFilePrincipalDatabase.class; + } + + private File createTemporaryPasswordFileWithJmxAdminUser() throws Exception + { + File passwordFile = File.createTempFile("passwd", "pwd"); + passwordFile.deleteOnExit(); + writePasswordFile(passwordFile, JMXTestUtils.DEFAULT_USERID, JMXTestUtils.DEFAULT_PASSWORD); + return passwordFile; + } + + private void writePasswordFile(File passwordFile, String... userNamePasswordPairs) throws Exception + { + FileWriter writer = null; + try + { + writer = new FileWriter(passwordFile); + for (int i = 0; i < userNamePasswordPairs.length; i=i+2) + { + String username = userNamePasswordPairs[i]; + String password = userNamePasswordPairs[i+1]; + writer.append(_passwd.getOutput(username, password) + "\n"); + } + } + finally + { + writer.close(); + } + } + + + private void assertFileContainsPasswordForUser(String username) throws IOException + { + assertTrue("Could not find password for user " + username + " within " + _passwordFile, passwordFileContainsUser(username)); + } + + private void assertFileDoesNotContainsPasswordForUser(String username) throws IOException + { + assertFalse("Could not find password for user " + username + " within " + _passwordFile, passwordFileContainsUser(username)); + } + + private boolean passwordFileContainsUser(String username) throws IOException + { + BufferedReader reader = null; + try + { + reader = new BufferedReader(new FileReader(_passwordFile)); + String line = reader.readLine(); + while(line != null) + { + if (line.startsWith(username)) + { + return true; + } + line = reader.readLine(); + } + + return false; + } + finally + { + reader.close(); + } + } + + private void assertJmsConnectionSucceeds(String username, String password) throws Exception + { + Connection connection = getConnection(username, password); + assertNotNull(connection); + } + + private void assertJmsConnectionFails(String username, String password) throws Exception + { + try + { + getConnection(username, password); + fail("Exception not thrown"); + } + catch (JMSException e) + { + // PASS + } + } +} diff --git a/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/UserManagementWithBase64MD5PasswordsTest.java b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/UserManagementWithBase64MD5PasswordsTest.java new file mode 100644 index 0000000000..84a66232ce --- /dev/null +++ b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/UserManagementWithBase64MD5PasswordsTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.systest.management.jmx; + +import org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase; +import org.apache.qpid.server.security.auth.database.PrincipalDatabase; +import org.apache.qpid.tools.security.Passwd; + +public class UserManagementWithBase64MD5PasswordsTest extends UserManagementTest +{ + @Override + protected Passwd createPasswordEncodingUtility() + { + return new Passwd(); + } + + @Override + protected Class<? extends PrincipalDatabase> getPrincipalDatabaseImplClass() + { + return Base64MD5PasswordFilePrincipalDatabase.class; + } + +} |
