diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2008-07-04 16:28:53 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2008-07-04 16:28:53 +0000 |
| commit | f15d072783f7cfe756cffe2a49dbf3c29fb44d8a (patch) | |
| tree | 013222f24ad9e1f5bb292842e7f03df452239749 /qpid/java | |
| parent | 697360cf3bb8b76eca97705634d5008c230175d8 (diff) | |
| download | qpid-python-f15d072783f7cfe756cffe2a49dbf3c29fb44d8a.tar.gz | |
QPID-871 - Added a ConnectionRegistry per Virtualhost to track the open connections.
Altered the ApplicationRegistry so that when the shutdown hook is fired it:
Unbinds from the listening sockets
Then closes each virtualhost which in turn closes all the active TCP connections before closing the MessageStore thus preventing any logged errors occuring as a result of the active TCP connection performing an action on the closed store.
Test provided MessageStoreShutdownTest which uses the new InternalBrokerBaseCase and InternalTestProtocolSession classes to perform system testing of the Broker without TCP framing or client codebase.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@674085 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
16 files changed, 902 insertions, 164 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index 41d7f6c067..8ad2ace1b2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -34,7 +34,6 @@ import org.apache.log4j.Logger; import org.apache.log4j.xml.DOMConfigurator; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoAcceptor; -import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.common.FixedSizeByteBufferAllocator; import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; @@ -421,7 +420,8 @@ public class Main bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port); } - acceptor.bind(bindAddress, handler, sconfig); + bind(acceptor, bindAddress, handler, sconfig); + //fixme qpid.AMQP should be using qpidproperties to get value _brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress); } @@ -432,7 +432,8 @@ public class Main try { - acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), handler, sconfig); + bind(acceptor, new InetSocketAddress(connectorConfig.sslPort), handler, sconfig); + //fixme qpid.AMQP should be using qpidproperties to get value _brokerLogger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort); @@ -455,6 +456,23 @@ public class Main } } + /** + * Ensure that any bound Acceptors are recorded in the registry so they can be closed later. + * + * @param acceptor + * @param bindAddress + * @param handler + * @param sconfig + * + * @throws IOException from the acceptor.bind command + */ + private void bind(IoAcceptor acceptor, InetSocketAddress bindAddress, AMQPFastProtocolHandler handler, SocketAcceptorConfig sconfig) throws IOException + { + acceptor.bind(bindAddress, handler, sconfig); + + ApplicationRegistry.getInstance().addAcceptor(bindAddress, acceptor); + } + public static void main(String[] args) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java new file mode 100644 index 0000000000..d287595e2d --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.connection; + +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.protocol.AMQMinaProtocolSession; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.protocol.AMQConstant; + +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.List; + +public class ConnectionRegistry implements IConnectionRegistry +{ + private List<AMQProtocolSession> _registry = new CopyOnWriteArrayList<AMQProtocolSession>(); + + private VirtualHost _virtualHost; + + public ConnectionRegistry(VirtualHost virtualHost) + { + _virtualHost = virtualHost; + } + + public void initialise() + { + + } + + /** Close all of the currently open connections. */ + public void close() throws AMQException + { + while (!_registry.isEmpty()) + { + AMQProtocolSession connection = _registry.get(0); + + connection.closeConnection(0, new AMQConnectionException(AMQConstant.INTERNAL_ERROR, "Broker is shutting down", + 0, 0, + connection.getProtocolOutputConverter().getProtocolMajorVersion(), + connection.getProtocolOutputConverter().getProtocolMinorVersion(), + (Throwable) null), true); + } + } + + public void registerConnection(AMQProtocolSession connnection) + { + _registry.add(connnection); + } + + public void deregisterConnection(AMQProtocolSession connnection) + { + _registry.remove(connnection); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java new file mode 100644 index 0000000000..d64fde1c20 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.connection; + +import org.apache.qpid.server.protocol.AMQMinaProtocolSession; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.AMQException; + +public interface IConnectionRegistry +{ + + public void initialise(); + + public void close() throws AMQException; + + public void registerConnection(AMQProtocolSession connnection); + + public void deregisterConnection(AMQProtocolSession connnection); + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index bdb16d0fcb..b4075b81ac 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -25,6 +25,7 @@ import org.apache.log4j.Logger; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.common.IoSession; +import org.apache.mina.common.CloseFuture; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.AMQChannelException; @@ -99,7 +100,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable private Object _lastSent; - private boolean _closed; + protected boolean _closed; // maximum number of channels this session should have private long _maxNoOfChannels = 1000; @@ -115,13 +116,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable private MethodDispatcher _dispatcher; private ProtocolSessionIdentifier _sessionIdentifier; + private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L; + private org.apache.mina.common.WriteFuture _lastWriteFuture; + public ManagedObject getManagedObject() { return _managedObject; } public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory) - throws AMQException + throws AMQException { _stateManager = new AMQStateManager(virtualHostRegistry, this); _minaProtocolSession = session; @@ -145,7 +149,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, - AMQStateManager stateManager) throws AMQException + AMQStateManager stateManager) throws AMQException { _stateManager = stateManager; _minaProtocolSession = session; @@ -199,7 +203,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } private void frameReceived(AMQFrame frame) throws AMQException - { + { int channelId = frame.getChannel(); AMQBody body = frame.getBodyFrame(); @@ -222,15 +226,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { if (_logger.isInfoEnabled()) { - _logger.info("Channel[" + channelId + "] awaiting closure ignoring"); + _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame); } + closeProtocolSession(); return; } } - - try { body.handle(channelId, this); @@ -258,7 +261,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable String locales = "en_US"; - AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(), (short) getProtocolMinorVersion(), null, @@ -266,7 +268,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable locales.getBytes()); _minaProtocolSession.write(responseBody.generateFrame(0)); - } catch (AMQException e) { @@ -332,27 +333,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _logger.info("Closing connection due to: " + e.getMessage()); } - closeSession(); - AMQConnectionException ce = - evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, - AMQConstant.CHANNEL_ERROR.getName().toString()); + evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, + AMQConstant.CHANNEL_ERROR.getName().toString()); - _stateManager.changeState(AMQState.CONNECTION_CLOSING); - writeFrame(ce.getCloseFrame(channelId)); + closeConnection(channelId, ce, false); } } catch (AMQConnectionException e) { - if (_logger.isInfoEnabled()) - { - _logger.info("Closing connection due to: " + e.getMessage()); - } - - markChannelAwaitingCloseOk(channelId); - closeSession(); - _stateManager.changeState(AMQState.CONNECTION_CLOSING); - writeFrame(e.getCloseFrame(channelId)); + closeConnection(channelId, e, false); } } catch (Exception e) @@ -365,7 +355,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _logger.error("Unexpected exception while processing frame. Closing connection.", e); - _minaProtocolSession.close(); + closeProtocolSession(); } } @@ -399,7 +389,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public void writeFrame(AMQDataBlock frame) { _lastSent = frame; - _minaProtocolSession.write(frame); + + _lastWriteFuture = _minaProtocolSession.write(frame); } public AMQShortString getContextKey() @@ -431,7 +422,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public AMQChannel getChannel(int channelId) throws AMQException { final AMQChannel channel = - ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); + ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); if ((channel == null) || channel.isClosing()) { return null; @@ -464,8 +455,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable if (_channelMap.size() == _maxNoOfChannels) { String errorMessage = - toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels - + "); can't create channel"; + toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels + + "); can't create channel"; _logger.error(errorMessage); throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage); } @@ -619,6 +610,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable if (!_closed) { _closed = true; + + if (_virtualHost != null) + { + _virtualHost.getConnectionRegistry().deregisterConnection(this); + } + closeAllChannels(); if (_managedObject != null) { @@ -632,9 +629,54 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } } + public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException + { + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + e.getMessage()); + } + + markChannelAwaitingCloseOk(channelId); + closeSession(); + _stateManager.changeState(AMQState.CONNECTION_CLOSING); + writeFrame(e.getCloseFrame(channelId)); + + if (closeProtocolSession) + { + closeProtocolSession(); + } + } + + public void closeProtocolSession() + { + closeProtocolSession(true); + } + + public void closeProtocolSession(boolean waitLast) + { + _logger.debug("Waiting for last write to join."); + if (waitLast && (_lastWriteFuture != null)) + { + _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); + } + + _logger.debug("REALLY Closing protocol session:" + _minaProtocolSession); + final CloseFuture future = _minaProtocolSession.close(); + future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); + + try + { + _stateManager.changeState(AMQState.CONNECTION_CLOSED); + } + catch (AMQException e) + { + _logger.info(e.getMessage()); + } + } + public String toString() { - return _minaProtocolSession.getRemoteAddress() + "("+(getAuthorizedID() == null ? "?" : getAuthorizedID().getName()+")"); + return _minaProtocolSession.getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")"); } public String dump() @@ -752,6 +794,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public void setVirtualHost(VirtualHost virtualHost) throws AMQException { _virtualHost = virtualHost; + + _virtualHost.getConnectionRegistry().registerConnection(this); + _managedObject = createMBean(); _managedObject.register(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index c3400029da..1bac601225 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -25,6 +25,7 @@ import javax.security.sasl.SaslServer; import org.apache.qpid.AMQException; import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.*; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.output.ProtocolOutputConverter; @@ -150,6 +151,10 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession /** This must be called when the session is _closed in order to free up any resources managed by the session. */ void closeSession() throws AMQException; + /** This must be called to close the session in order to free up any resources managed by the session. */ + void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException; + + /** @return a key that uniquely identifies this session */ Object getKey(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index d0dcd051f0..863f837462 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -24,9 +24,17 @@ import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.management.ManagedObjectRegistry; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; +import org.apache.qpid.server.security.access.ACLPlugin; +import org.apache.qpid.server.plugins.PluginManager; +import org.apache.mina.common.IoAcceptor; import java.util.HashMap; import java.util.Map; +import java.net.InetSocketAddress; /** * An abstract application registry that provides access to configuration information and handles the @@ -48,6 +56,21 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public static final String DEFAULT_APPLICATION_REGISTRY = "org.apache.qpid.server.util.NullApplicationRegistry"; public static String _APPLICATION_REGISTRY = DEFAULT_APPLICATION_REGISTRY; + protected final Map<InetSocketAddress, IoAcceptor> _acceptors = new HashMap<InetSocketAddress, IoAcceptor>(); + + protected ManagedObjectRegistry _managedObjectRegistry; + + protected AuthenticationManager _authenticationManager; + + protected VirtualHostRegistry _virtualHostRegistry; + + protected ACLPlugin _accessManager; + + protected PrincipalDatabaseManager _databaseManager; + + protected PluginManager _pluginManager; + + static { Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService())); @@ -57,7 +80,6 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { public void run() { - _logger.info("Shutting down application registries..."); removeAll(); } } @@ -90,6 +112,12 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } + /** + * Method to cleanly shutdown specified registry running in this JVM + * + * @param instanceID the instance to shutdown + */ + public static void remove(int instanceID) { try @@ -111,8 +139,9 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } + /** Method to cleanly shutdown all registries currently running in this JVM */ public static void removeAll() - { + { Object[] keys = _instanceMap.keySet().toArray(); for (Object k : keys) { @@ -162,6 +191,10 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public void close() throws Exception { + //Stop incomming connections + unbind(); + + //Shutdown virtualhosts for (VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts()) { virtualHost.close(); @@ -174,11 +207,31 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } + private void unbind() + { + synchronized (_acceptors) + { + for (InetSocketAddress bindAddress : _acceptors.keySet()) + { + IoAcceptor acceptor = _acceptors.get(bindAddress); + acceptor.unbind(bindAddress); + } + } + } + public Configuration getConfiguration() { return _configuration; } + public void addAcceptor(InetSocketAddress bindAddress, IoAcceptor acceptor) + { + synchronized (_acceptors) + { + _acceptors.put(bindAddress, acceptor); + } + } + public <T> T getConfiguredObject(Class<T> instanceType) { T instance = (T) _configuredObjects.get(instanceType); @@ -204,4 +257,35 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { _APPLICATION_REGISTRY = clazz; } + + public VirtualHostRegistry getVirtualHostRegistry() + { + return _virtualHostRegistry; + } + + public ACLPlugin getAccessManager() + { + return _accessManager; + } + + public ManagedObjectRegistry getManagedObjectRegistry() + { + return _managedObjectRegistry; + } + + public PrincipalDatabaseManager getDatabaseManager() + { + return _databaseManager; + } + + public AuthenticationManager getAuthenticationManager() + { + return _authenticationManager; + } + + public PluginManager getPluginManager() + { + return _pluginManager; + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java index fef958000a..baab56a4a4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java @@ -48,18 +48,6 @@ import org.apache.qpid.AMQException; public class ConfigurationFileApplicationRegistry extends ApplicationRegistry { - private ManagedObjectRegistry _managedObjectRegistry; - - private AuthenticationManager _authenticationManager; - - private ACLPlugin _accessManager; - - private PrincipalDatabaseManager _databaseManager; - - private VirtualHostRegistry _virtualHostRegistry; - - private PluginManager _pluginManager; - public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException { @@ -145,39 +133,9 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry } } - - public VirtualHostRegistry getVirtualHostRegistry() - { - return _virtualHostRegistry; - } - - public ACLPlugin getAccessManager() - { - return _accessManager; - } - - public ManagedObjectRegistry getManagedObjectRegistry() - { - return _managedObjectRegistry; - } - - public PrincipalDatabaseManager getDatabaseManager() - { - return _databaseManager; - } - - public AuthenticationManager getAuthenticationManager() - { - return _authenticationManager; - } - public Collection<String> getVirtualHostNames() { return getConfiguration().getList("virtualhosts.virtualhost.name"); } - public PluginManager getPluginManager() - { - return _pluginManager; - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index ca10fbdba2..597ef042f9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.registry; import java.util.Collection; +import java.net.InetSocketAddress; import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.management.ManagedObjectRegistry; @@ -29,6 +30,7 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.mina.common.IoAcceptor; public interface IApplicationRegistry { @@ -39,6 +41,10 @@ public interface IApplicationRegistry */ void initialise() throws Exception; + /** + * Shutdown this Registry + * @throws Exception - //fixme needs to be made more specific + */ void close() throws Exception; /** @@ -71,5 +77,12 @@ public interface IApplicationRegistry ACLPlugin getAccessManager(); PluginManager getPluginManager(); - + + /** + * Register any acceptors for this registry + * @param bindAddress The address that the acceptor has been bound with + * @param acceptor The acceptor in use + */ + void addAcceptor(InetSocketAddress bindAddress, IoAcceptor acceptor); + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java index 0acfa84f31..87aa15be84 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java @@ -42,19 +42,6 @@ import org.apache.qpid.server.virtualhost.VirtualHostRegistry; public class NullApplicationRegistry extends ApplicationRegistry { - private ManagedObjectRegistry _managedObjectRegistry; - - private AuthenticationManager _authenticationManager; - - private VirtualHostRegistry _virtualHostRegistry; - - private ACLPlugin _accessManager; - - private PrincipalDatabaseManager _databaseManager; - - private PluginManager _pluginManager; - - public NullApplicationRegistry() { super(new MapConfiguration(new HashMap())); @@ -84,47 +71,11 @@ public class NullApplicationRegistry extends ApplicationRegistry } - public Configuration getConfiguration() - { - return _configuration; - } - - - public ManagedObjectRegistry getManagedObjectRegistry() - { - return _managedObjectRegistry; - } - - public PrincipalDatabaseManager getDatabaseManager() - { - return _databaseManager; - } - - public AuthenticationManager getAuthenticationManager() - { - return _authenticationManager; - } - public Collection<String> getVirtualHostNames() { String[] hosts = {"test"}; return Arrays.asList(hosts); } - - public VirtualHostRegistry getVirtualHostRegistry() - { - return _virtualHostRegistry; - } - - public ACLPlugin getAccessManager() - { - return _accessManager; - } - - public PluginManager getPluginManager() - { - return _pluginManager; - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index f69751b708..977bd84491 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -26,6 +26,8 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.log4j.Logger; import org.apache.qpid.server.AMQBrokerManagerMBean; +import org.apache.qpid.server.connection.ConnectionRegistry; +import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.security.access.Accessable; @@ -55,6 +57,8 @@ public class VirtualHost implements Accessable private final String _name; + private ConnectionRegistry _connectionRegistry; + private QueueRegistry _queueRegistry; private ExchangeRegistry _exchangeRegistry; @@ -74,7 +78,8 @@ public class VirtualHost implements Accessable private final Timer _houseKeepingTimer; private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L; - + + public void setAccessableName(String name) { _logger.warn("Setting Accessable Name for VirualHost is not allowed. (" @@ -86,6 +91,10 @@ public class VirtualHost implements Accessable return _name; } + public IConnectionRegistry getConnectionRegistry() + { + return _connectionRegistry; + } /** * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any @@ -143,8 +152,8 @@ public class VirtualHost implements Accessable _name = name; _virtualHostMBean = new VirtualHostMBean(); - // This isn't needed to be registered - //_virtualHostMBean.register(); + + _connectionRegistry = new ConnectionRegistry(this); _houseKeepingTimer = new Timer("Queue-housekeeping-"+name, true); _queueRegistry = new DefaultQueueRegistry(this); @@ -283,14 +292,20 @@ public class VirtualHost implements Accessable public ACLPlugin getAccessManager() { return _accessManager; - } + } public void close() throws Exception { + //Stop Housekeeping if (_houseKeepingTimer != null) { _houseKeepingTimer.cancel(); } + + //Stop Connections + _connectionRegistry.close(); + + //Close MessageStore if (_messageStore != null) { _messageStore.close(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java new file mode 100644 index 0000000000..51012bc776 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -0,0 +1,173 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.AMQChannel; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class InternalTestProtocolSession extends AMQMinaProtocolSession implements ProtocolOutputConverter +{ + // ChannelID(LIST) -> LinkedList<Pair> + final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers; + private AtomicInteger _deliveryCount = new AtomicInteger(0); + + public InternalTestProtocolSession() throws AMQException + { + super(new TestIoSession(), + ApplicationRegistry.getInstance().getVirtualHostRegistry(), + new AMQCodecFactory(true)); + + _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>(); + + } + + public ProtocolOutputConverter getProtocolOutputConverter() + { + return this; + } + + public byte getProtocolMajorVersion() + { + return (byte) 8; + } + + public byte getProtocolMinorVersion() + { + return (byte) 0; + } + + // *** + + public List<DeliveryPair> getDelivers(int channelId, AMQShortString consumerTag, int count) + { + synchronized (_channelDelivers) + { + List<DeliveryPair> msgs = _channelDelivers.get(channelId).get(consumerTag).subList(0, count); + + List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs); + + //Remove the msgs from the receivedList. + msgs.clear(); + + return response; + } + } + + // *** ProtocolOutputConverter Implementation + public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException + { + } + + public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) + { + } + + public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException + { + _deliveryCount.incrementAndGet(); + + synchronized (_channelDelivers) + { + Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId); + + if (consumers == null) + { + consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>(); + _channelDelivers.put(channelId, consumers); + } + + LinkedList<DeliveryPair> consumerDelivers = consumers.get(consumerTag); + + if (consumerDelivers == null) + { + consumerDelivers = new LinkedList<DeliveryPair>(); + consumers.put(consumerTag, consumerDelivers); + } + + consumerDelivers.add(new DeliveryPair(deliveryTag, message)); + } + } + + public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException + { + } + + public void awaitDelivery(int msgs) + { + while (msgs > _deliveryCount.get()) + { + try + { + Thread.sleep(100); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + } + } + + public class DeliveryPair + { + private long _deliveryTag; + private AMQMessage _message; + + public DeliveryPair(long deliveryTag, AMQMessage message) + { + _deliveryTag = deliveryTag; + _message = message; + } + + public AMQMessage getMessage() + { + return _message; + } + + public long getDeliveryTag() + { + return _deliveryTag; + } + } + + public boolean isClosed() + { + return _closed; + } + + public void closeProtocolSession(boolean waitLast) + { + // Override as we don't have a real IOSession to close. + // The alternative is to fully implement the TestIOSession to return a CloseFuture from close(); + // Then the AMQMinaProtocolSession can join on the returning future without a NPE. + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java new file mode 100644 index 0000000000..a695a67eea --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.util.InternalBrokerBaseCase; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; + +import java.util.List; + +public class MessageStoreShutdownTest extends InternalBrokerBaseCase +{ + + public void test() + { + subscribe(_session, _channel, _queue); + + try + { + publishMessages(_session, _channel, 1); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + fail(e.getMessage()); + } + + try + { + _registry.close(); + } + catch (Exception e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + fail(e.getMessage()); + } + + assertTrue("Session should now be closed", _session.isClosed()); + + + //Test attempting to modify the broker state after session has been closed. + + //The Message should have been removed from the unacked list. + + //Ack Messages + List<InternalTestProtocolSession.DeliveryPair> list = _session.getDelivers(_channel.getChannelId(), new AMQShortString("sgen_1"), 1); + + InternalTestProtocolSession.DeliveryPair pair = list.get(0); + + try + { + // The message should now be requeued and so unable to ack it. + _channel.acknowledgeMessage(pair.getDeliveryTag(), false); + } + catch (AMQException e) + { + assertEquals("Incorrect exception thrown", "Single ack on delivery tag 1 not known for channel:1", e.getMessage()); + } + + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java new file mode 100644 index 0000000000..c6cd5da01d --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -0,0 +1,183 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import junit.framework.TestCase; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.SimpleAMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.ConsumerTagNotUniqueException; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; + +public class InternalBrokerBaseCase extends TestCase +{ + protected IApplicationRegistry _registry; + protected MessageStore _messageStore; + protected AMQChannel _channel; + protected InternalTestProtocolSession _session; + protected VirtualHost _virtualHost; + protected StoreContext _storeContext = new StoreContext(); + protected AMQQueue _queue; + protected AMQShortString QUEUE_NAME; + + public void setUp() throws Exception + { + super.setUp(); + _registry = new TestApplicationRegistry(); + ApplicationRegistry.initialise(_registry); + _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test"); + _messageStore = _virtualHost.getMessageStore(); + + QUEUE_NAME = new AMQShortString("test"); + _queue = AMQQueueFactory.createAMQQueueImpl(QUEUE_NAME, false, new AMQShortString("testowner"), + false, _virtualHost, null); + + _virtualHost.getQueueRegistry().registerQueue(_queue); + + Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange(); + + _queue.bind(defaultExchange, QUEUE_NAME, null); + + _session = new InternalTestProtocolSession(); + + _session.setVirtualHost(_virtualHost); + + _channel = new AMQChannel(_session, 1, _messageStore); + + _session.addChannel(_channel); + } + + public void tearDown() throws Exception + { + ApplicationRegistry.removeAll(); + super.tearDown(); + } + + protected void checkStoreContents(int messageCount) + { + assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageMetaDataMap().size()); + + //The above publish message is sufficiently small not to fit in the header so no Body is required. + //assertEquals("Message body count incorrect in the ContentBodyMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getContentBodyMap().size()); + } + + protected AMQShortString subscribe(InternalTestProtocolSession session, AMQChannel channel, AMQQueue queue) + { + try + { + return channel.subscribeToQueue(null, queue, true, null, false, true); + } + catch (AMQException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + catch (ConsumerTagNotUniqueException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + //Keep the compiler happy + return null; + } + + public void publishMessages(InternalTestProtocolSession session, AMQChannel channel, int messages) throws AMQException + { + MessagePublishInfo info = new MessagePublishInfo() + { + public AMQShortString getExchange() + { + return ExchangeDefaults.DEFAULT_EXCHANGE_NAME; + } + + public void setExchange(AMQShortString exchange) + { + + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return QUEUE_NAME; + } + }; + + for (int count = 0; count < messages; count++) + { + channel.setPublishFrame(info, _virtualHost.getExchangeRegistry().getExchange(info.getExchange())); + + //Set the body size + ContentHeaderBody _headerBody = new ContentHeaderBody(); + _headerBody.bodySize = 0; + + //Set Minimum properties + BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); + + properties.setExpiration(0L); + properties.setTimestamp(System.currentTimeMillis()); + + //Make Message Persistent + properties.setDeliveryMode((byte) 2); + + _headerBody.properties = properties; + + channel.publishContentHeader(_headerBody); + } + + } + + public void acknowledge(AMQChannel channel, long deliveryTag) + { + try + { + channel.acknowledgeMessage(deliveryTag, false); + } + catch (AMQException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java new file mode 100644 index 0000000000..471912c85a --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -0,0 +1,128 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.MapConfiguration; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.management.ManagedObjectRegistry; +import org.apache.qpid.server.management.NoopManagedObjectRegistry; +import org.apache.qpid.server.plugins.PluginManager; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.security.access.ACLPlugin; +import org.apache.qpid.server.security.access.plugins.AllowAll; +import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; +import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Properties; +import java.util.Arrays; + +public class TestApplicationRegistry extends ApplicationRegistry +{ + private QueueRegistry _queueRegistry; + + private ExchangeRegistry _exchangeRegistry; + + private ExchangeFactory _exchangeFactory; + + private MessageStore _messageStore; + + private VirtualHost _vHost; + + + public TestApplicationRegistry() + { + super(new MapConfiguration(new HashMap())); + } + + public void initialise() throws Exception + { + Properties users = new Properties(); + + users.put("guest", "guest"); + + _databaseManager = new PropertiesPrincipalDatabaseManager("default", users); + + _accessManager = new AllowAll(); + + _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null); + + _managedObjectRegistry = new NoopManagedObjectRegistry(); + + _messageStore = new TestableMemoryMessageStore(); + + _virtualHostRegistry = new VirtualHostRegistry(); + + _vHost = new VirtualHost("test", _messageStore); + + _virtualHostRegistry.registerVirtualHost(_vHost); + + _queueRegistry = _vHost.getQueueRegistry(); + _exchangeFactory = _vHost.getExchangeFactory(); + _exchangeRegistry = _vHost.getExchangeRegistry(); + + _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes + } + + public QueueRegistry getQueueRegistry() + { + return _queueRegistry; + } + + public ExchangeRegistry getExchangeRegistry() + { + return _exchangeRegistry; + } + + public ExchangeFactory getExchangeFactory() + { + return _exchangeFactory; + } + + public Collection<String> getVirtualHostNames() + { + String[] hosts = {"test"}; + return Arrays.asList(hosts); + } + + public void setAccessManager(ACLPlugin newManager) + { + _accessManager = newManager; + } + + public MessageStore getMessageStore() + { + return _messageStore; + } + +} + + diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java index a1a405c313..14020299f6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.framing.*; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.output.ProtocolOutputConverter; @@ -117,6 +118,10 @@ public class MockProtocolSession implements AMQProtocolSession { } + public void closeConnection(int channelId, AMQConnectionException e, boolean closeIoSession) throws AMQException + { + } + public Object getKey() { return null; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java index 83b4665be6..6b5ab632b0 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -52,15 +52,8 @@ public class TestApplicationRegistry extends ApplicationRegistry private ExchangeFactory _exchangeFactory; - private ManagedObjectRegistry _managedObjectRegistry; - - private ACLPlugin _accessManager; - - private PrincipalDatabaseManager _databaseManager; - - private AuthenticationManager _authenticationManager; - private MessageStore _messageStore; + private VirtualHost _vHost; public TestApplicationRegistry() @@ -92,11 +85,6 @@ public class TestApplicationRegistry extends ApplicationRegistry _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes } - public Configuration getConfiguration() - { - return _configuration; - } - public QueueRegistry getQueueRegistry() { return _queueRegistry; @@ -112,21 +100,6 @@ public class TestApplicationRegistry extends ApplicationRegistry return _exchangeFactory; } - public ManagedObjectRegistry getManagedObjectRegistry() - { - return _managedObjectRegistry; - } - - public PrincipalDatabaseManager getDatabaseManager() - { - return _databaseManager; - } - - public AuthenticationManager getAuthenticationManager() - { - return _authenticationManager; - } - public Collection<String> getVirtualHostNames() { return null; //To change body of implemented methods use File | Settings | File Templates. @@ -137,11 +110,6 @@ public class TestApplicationRegistry extends ApplicationRegistry return null; //To change body of implemented methods use File | Settings | File Templates. } - public ACLPlugin getAccessManager() - { - return _accessManager; - } - public void setAccessManager(ACLPlugin newManager) { _accessManager = newManager; |
