diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2010-05-20 15:18:59 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2010-05-20 15:18:59 +0000 |
| commit | 875b9b35146a0e1c9da2c4de7c1dcf0a45698f52 (patch) | |
| tree | 8b7eb916374d2fc8109a88119a313be962efee91 /java | |
| parent | 4f641bd0c0deb1eda0524d8b73bc295b160817c4 (diff) | |
| download | qpid-python-875b9b35146a0e1c9da2c4de7c1dcf0a45698f52.tar.gz | |
QPID-2622 : Add Closeable interface and update Broker components to use it and add close method in ApplicationRegistry to safely perform the close.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@946667 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
15 files changed, 141 insertions, 97 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index ef7426c814..8bb1a6b9fa 100644 --- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -22,6 +22,7 @@ package org.apache.qpid.qmf; import org.apache.qpid.AMQException; +import org.apache.qpid.common.Closeable; import org.apache.qpid.qmf.schema.BrokerSchema; import org.apache.qpid.server.configuration.*; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -34,7 +35,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -public class QMFService implements ConfigStore.ConfigEventListener +public class QMFService implements ConfigStore.ConfigEventListener, Closeable { diff --git a/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index 7b50a2e3ad..69bdf94621 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.server.connection; -import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.log4j.Logger; +import org.apache.qpid.common.Closeable; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; import org.apache.qpid.AMQConnectionException; @@ -29,16 +30,11 @@ import org.apache.qpid.protocol.AMQConstant; import java.util.concurrent.CopyOnWriteArrayList; import java.util.List; -public class ConnectionRegistry implements IConnectionRegistry +public class ConnectionRegistry implements IConnectionRegistry, Closeable { private List<AMQProtocolSession> _registry = new CopyOnWriteArrayList<AMQProtocolSession>(); - private VirtualHost _virtualHost; - - public ConnectionRegistry(VirtualHost virtualHost) - { - _virtualHost = virtualHost; - } + private Logger _logger = Logger.getLogger(ConnectionRegistry.class); public void initialise() { @@ -54,17 +50,24 @@ public class ConnectionRegistry implements IConnectionRegistry } /** Close all of the currently open connections. */ - public void close() throws AMQException + public void close() { while (!_registry.isEmpty()) { AMQProtocolSession connection = _registry.get(0); - connection.closeConnection(0, new AMQConnectionException(AMQConstant.INTERNAL_ERROR, "Broker is shutting down", - 0, 0, - connection.getProtocolOutputConverter().getProtocolMajorVersion(), - connection.getProtocolOutputConverter().getProtocolMinorVersion(), - (Throwable) null), true); + try + { + connection.closeConnection(0, new AMQConnectionException(AMQConstant.INTERNAL_ERROR, "Broker is shutting down", + 0, 0, + connection.getProtocolOutputConverter().getProtocolMajorVersion(), + connection.getProtocolOutputConverter().getProtocolMinorVersion(), + (Throwable) null), true); + } + catch (AMQException e) + { + _logger.warn("Error closing connection:" + e.getMessage()); + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java index b58b17ba86..fda80ad0dd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.management; import javax.management.JMException; import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.common.Closeable; import java.rmi.RemoteException; import java.io.IOException; @@ -39,13 +40,11 @@ import java.io.IOException; * be the obvious choice for managed objects. * */ -public interface ManagedObjectRegistry +public interface ManagedObjectRegistry extends Closeable { void start() throws IOException, ConfigurationException; void registerObject(ManagedObject managedObject) throws JMException; void unregisterObject(ManagedObject managedObject) throws JMException; - - void close() throws RemoteException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java index b4fbed6948..a048e75b2e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java @@ -53,7 +53,7 @@ public class NoopManagedObjectRegistry implements ManagedObjectRegistry { } - public void close() throws RemoteException + public void close() { } diff --git a/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java index 924570f00d..c1eff5f8db 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java @@ -24,6 +24,7 @@ import org.apache.felix.framework.Felix; import org.apache.felix.framework.util.FelixConstants; import org.apache.felix.framework.util.StringMap; import org.apache.felix.main.AutoProcessor; +import org.apache.qpid.common.Closeable; import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; import org.apache.qpid.server.exchange.ExchangeType; import org.apache.qpid.server.security.access.ACLPlugin; @@ -50,7 +51,7 @@ import java.util.Map; * Provides access to pluggable elements, such as exchanges */ -public class PluginManager +public class PluginManager implements Closeable { private ServiceTracker _exchangeTracker = null; private ServiceTracker _securityTracker = null; diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 727e902a0c..d339304ab6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.registry; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.common.Closeable; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.qmf.QMFService; import org.apache.qpid.server.configuration.BrokerConfig; @@ -339,73 +340,52 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } - public void close() throws Exception + /** + * Close non-null Closeable items and log any errors + * @param close + */ + private void close(Closeable close) { - if (_logger.isInfoEnabled()) + try { - _logger.info("Shutting down ApplicationRegistry:" + this); + if (close != null) + { + close.close(); + } } - - try + catch (Throwable e) { - //Stop incoming connections - unbind(); + _logger.error("Error thrown whilst closing " + close.getClass().getSimpleName(), e); } - finally + } + + + public void close() + { + if (_logger.isInfoEnabled()) { - try - { -// Replace with this -// _virtualHostRegistry.close(); + _logger.info("Shutting down ApplicationRegistry:" + this); + } - //Shutdown virtualhosts - for (VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts()) - { - virtualHost.close(); - } - } - finally - { -// _accessManager.close(); + //Stop incoming connections + unbind(); + + //Shutdown virtualhosts + close(_virtualHostRegistry); + +// close(_accessManager); // -// _databaseManager.close(); +// close(_databaseManager); - try - { - _authenticationManager.close(); - } - finally - { - try - { - // close the rmi registry(if any) started for management - if (_managedObjectRegistry != null) - { - _managedObjectRegistry.close(); - } - } - finally - { - try - { - _qmfService.close(); - } - finally - { - try - { - _pluginManager.close(); - } - finally - { - CurrentActor.get().message(BrokerMessages.BRK_STOPPED()); - } - } - } + close(_authenticationManager); - } - } - } + close(_managedObjectRegistry); + + close(_qmfService); + + close(_pluginManager); + + CurrentActor.get().message(BrokerMessages.BRK_STOPPED()); } private void unbind() @@ -415,8 +395,17 @@ public abstract class ApplicationRegistry implements IApplicationRegistry for (InetSocketAddress bindAddress : _acceptors.keySet()) { QpidAcceptor acceptor = _acceptors.get(bindAddress); - acceptor.getNetworkDriver().close(); - CurrentActor.get().message(BrokerMessages.BRK_SHUTTING_DOWN(acceptor.toString(), bindAddress.getPort())); + + try + { + acceptor.getNetworkDriver().close(); + } + catch (Throwable e) + { + _logger.error("Unable to close network driver due to:" + e.getMessage()); + } + + CurrentActor.get().message(BrokerMessages.BRK_SHUTTING_DOWN(acceptor.toString(), bindAddress.getPort())); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java index 5dd7520b8b..ae18cc0edb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java @@ -40,14 +40,13 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry @Override - public void close() throws Exception + public void close() { //Set the Actor for Broker Shutdown CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger)); try { super.close(); - _qmfService.close(); } finally { diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index 1fa16099c8..62dddeda92 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -53,9 +53,8 @@ public interface IApplicationRegistry /** * Shutdown this Registry - * @throws Exception - //fixme needs to be made more specific */ - void close() throws Exception; + void close(); /** * Get the low level configuration. For use cases where the configured object approach is not required diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java index d1803124a7..d34d0c4d27 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java @@ -20,19 +20,18 @@ */ package org.apache.qpid.server.security.auth.manager; +import org.apache.qpid.common.Closeable; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.security.auth.AuthenticationResult; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; -public interface AuthenticationManager +public interface AuthenticationManager extends Closeable { String getMechanisms(); SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException; AuthenticationResult authenticate(SaslServer server, byte[] response); - - void close(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index a5d75d8574..9e1c07863c 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.virtualhost; +import org.apache.qpid.common.Closeable; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.configuration.VirtualHostConfiguration; @@ -42,7 +43,7 @@ import java.util.UUID; import java.util.TimerTask; import java.util.concurrent.FutureTask; -public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig +public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable { IConnectionRegistry getConnectionRegistry(); @@ -66,7 +67,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo ACLManager getAccessManager(); - void close() throws Exception; + void close(); ManagedObject getManagedObject(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 413ebe159e..4a2f796d2b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -217,7 +217,7 @@ public class VirtualHostImpl implements Accessable, VirtualHost _virtualHostMBean = new VirtualHostMBean(); - _connectionRegistry = new ConnectionRegistry(this); + _connectionRegistry = new ConnectionRegistry(); _houseKeepingTasks = new ScheduledThreadPoolExecutor(_configuration.getHouseKeepingThreadCount()); @@ -607,9 +607,8 @@ public class VirtualHostImpl implements Accessable, VirtualHost return _accessManager; } - public void close() throws Exception + public void close() { - //Stop Connections _connectionRegistry.close(); @@ -627,16 +626,32 @@ public class VirtualHostImpl implements Accessable, VirtualHost { _houseKeepingTasks.shutdown(); - if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) + try + { + if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) + { + _houseKeepingTasks.shutdownNow(); + } + } + catch (InterruptedException e) { - _houseKeepingTasks.shutdownNow(); + _logger.warn("Interrupted during Housekeeping shutdown:" + e.getMessage()); + // Swallowing InterruptedException ok as we are shutting down. } } //Close MessageStore if (_messageStore != null) { - _messageStore.close(); + //Remove MessageStore Interface should not throw Exception + try + { + _messageStore.close(); + } + catch (Exception e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } } CurrentActor.get().message(VirtualHostMessages.VHT_CLOSED()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java index 5975eeec3d..4b140f1ca7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java @@ -20,6 +20,7 @@ */
package org.apache.qpid.server.virtualhost;
+import org.apache.qpid.common.Closeable;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.configuration.ConfigStore;
@@ -29,7 +30,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap;
-public class VirtualHostRegistry
+public class VirtualHostRegistry implements Closeable
{
private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String, VirtualHost>();
@@ -91,4 +92,13 @@ public class VirtualHostRegistry {
return _applicationRegistry.getConfigStore();
}
+
+ public void close()
+ {
+ for (VirtualHost virtualHost : getVirtualHosts())
+ {
+ virtualHost.close();
+ }
+
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java index d24119f0d0..9f00ce3cad 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java +++ b/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java @@ -94,7 +94,7 @@ public class NullApplicationRegistry extends ApplicationRegistry @Override - public void close() throws Exception + public void close() { try { diff --git a/java/common/src/main/java/org/apache/qpid/common/Closeable.java b/java/common/src/main/java/org/apache/qpid/common/Closeable.java new file mode 100644 index 0000000000..45a98b5843 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/common/Closeable.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.common; + + +public interface Closeable +{ + public void close(); +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java index 17546b4d39..5b7670eaa3 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -524,11 +524,12 @@ public class QpidTestCase extends TestCase } catch (Exception e) { + _logger.error("Broker initialise failed due to:",e); try { registry.close(); } - catch (Exception closeE) + catch (Throwable closeE) { closeE.printStackTrace(); } |
