diff options
| author | Keith Wall <kwall@apache.org> | 2012-03-30 13:44:25 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-03-30 13:44:25 +0000 |
| commit | 38d1f36fe4238a887f867350adaa56489e53e0e6 (patch) | |
| tree | 1a5504424a30e6fce56e89123c6036bed002d05b /qpid/java/broker | |
| parent | da8070494a06d0b6c37127eb0a3439e394bddd31 (diff) | |
| download | qpid-python-38d1f36fe4238a887f867350adaa56489e53e0e6.tar.gz | |
QPID-3917: Refactor VirtualHost/MessageStore implementations to be ready for BDB-HA
Applied patch from Andrew MacBean <andymacbean@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1307416 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
69 files changed, 1938 insertions, 1296 deletions
diff --git a/qpid/java/broker/etc/virtualhosts.xml b/qpid/java/broker/etc/virtualhosts.xml index 4dcdcda6d2..1f7f91de9a 100644 --- a/qpid/java/broker/etc/virtualhosts.xml +++ b/qpid/java/broker/etc/virtualhosts.xml @@ -25,8 +25,8 @@ <name>localhost</name> <localhost> <store> - <class>org.apache.qpid.server.store.MemoryMessageStore</class> - <!--<class>org.apache.qpid.server.store.DerbyMessageStore</class> + <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass> + <!--<factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass> <environment-path>${QPID_WORK}/derbystore</environment-path>--> </store> @@ -86,8 +86,8 @@ <name>development</name> <development> <store> - <class>org.apache.qpid.server.store.MemoryMessageStore</class> - <!--<class>org.apache.qpid.server.store.DerbyMessageStore</class> + <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass> + <!--<factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass> <environment-path>${QPID_WORK}/derbystore</environment-path>--> </store> @@ -125,8 +125,8 @@ <name>test</name> <test> <store> - <class>org.apache.qpid.server.store.MemoryMessageStore</class> - <!--<class>org.apache.qpid.server.store.DerbyMessageStore</class> + <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass> + <!--<factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass> <environment-path>${QPID_WORK}/derbystore</environment-path>--> </store> diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index b388def86c..7ef06ce0f8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -37,7 +37,6 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.AMQQueueMBean; import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -60,8 +59,6 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr private final QueueRegistry _queueRegistry; private final ExchangeRegistry _exchangeRegistry; private final ExchangeFactory _exchangeFactory; - private final Exchange _defaultExchange; - private final DurableConfigurationStore _durableConfig; private final VirtualHostImpl.VirtualHostMBean _virtualHostMBean; @@ -75,8 +72,6 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr _queueRegistry = virtualHost.getQueueRegistry(); _exchangeRegistry = virtualHost.getExchangeRegistry(); - _defaultExchange = _exchangeRegistry.getDefaultExchange(); - _durableConfig = virtualHost.getMessageStore(); _exchangeFactory = virtualHost.getExchangeFactory(); } @@ -181,7 +176,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr _exchangeRegistry.registerExchange(exchange); if (durable) { - _durableConfig.createExchange(exchange); + getVirtualHost().getMessageStore().createExchange(exchange); } } else @@ -275,10 +270,10 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr false, false, getVirtualHost(), args); if (queue.isDurable() && !queue.isAutoDelete()) { - _durableConfig.createQueue(queue, args); + getVirtualHost().getMessageStore().createQueue(queue, args); } - virtualHost.getBindingFactory().addBinding(queueName, queue, _defaultExchange, null); + virtualHost.getBindingFactory().addBinding(queueName, queue, _exchangeRegistry.getDefaultExchange(), null); } catch (AMQException ex) { @@ -317,7 +312,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr queue.delete(); if (queue.isDurable()) { - _durableConfig.removeQueue(queue); + getVirtualHost().getMessageStore().removeQueue(queue); } } catch (AMQException ex) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java index 250c417ef1..2460be4705 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java @@ -34,7 +34,6 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.BindingMessages; import org.apache.qpid.server.logging.subjects.BindingLogSubject; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collections; @@ -44,38 +43,14 @@ import java.util.concurrent.ConcurrentHashMap; public class BindingFactory { private final VirtualHost _virtualHost; - private final DurableConfigurationStore.Source _configSource; - private final Exchange _defaultExchange; private final ConcurrentHashMap<BindingImpl, BindingImpl> _bindings = new ConcurrentHashMap<BindingImpl, BindingImpl>(); - public BindingFactory(final VirtualHost vhost) { - this(vhost, vhost.getExchangeRegistry().getDefaultExchange()); - } - - public BindingFactory(final DurableConfigurationStore.Source configSource, final Exchange defaultExchange) - { - _configSource = configSource; - _defaultExchange = defaultExchange; - if (configSource instanceof VirtualHost) - { - _virtualHost = (VirtualHost) configSource; - } - else - { - _virtualHost = null; - } - } - - public VirtualHost getVirtualHost() - { - return _virtualHost; + _virtualHost = vhost; } - - private final class BindingImpl extends Binding implements AMQQueue.Task, Exchange.Task, BindingConfig { private final BindingLogSubject _logSubject; @@ -156,30 +131,38 @@ public class BindingFactory private boolean makeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force) throws AMQSecurityException, AMQInternalException { assert queue != null; + final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange(); + if (bindingKey == null) { bindingKey = ""; } if (exchange == null) { - exchange = _defaultExchange; + exchange = defaultExchange; } if (arguments == null) { arguments = Collections.emptyMap(); } + if (exchange == null) + { + throw new IllegalArgumentException("exchange cannot be null"); + } + // The default exchange bindings must reflect the existence of queues, allow // all operations on it to succeed. It is up to the broker to prevent illegal // attempts at binding to this exchange, not the ACLs. - if(exchange != _defaultExchange) + if(exchange != defaultExchange) { //Perform ACLs - if (!getVirtualHost().getSecurityManager().authoriseBind(exchange, queue, new AMQShortString(bindingKey))) + if (!_virtualHost.getSecurityManager().authoriseBind(exchange, queue, new AMQShortString(bindingKey))) { throw new AMQSecurityException("Permission denied: binding " + bindingKey); } } + BindingImpl b = new BindingImpl(bindingKey,queue,exchange,arguments); BindingImpl existingMapping = _bindings.putIfAbsent(b,b); @@ -192,7 +175,7 @@ public class BindingFactory if (b.isDurable() && !restore) { - _configSource.getMessageStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments)); + _virtualHost.getMessageStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments)); } queue.addQueueDeleteTask(b); @@ -212,7 +195,7 @@ public class BindingFactory private ConfigStore getConfigStore() { - return getVirtualHost().getConfigStore(); + return _virtualHost.getConfigStore(); } public void restoreBinding(final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException, AMQInternalException @@ -229,13 +212,15 @@ public class BindingFactory public Binding removeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException { assert queue != null; + final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange(); + if (bindingKey == null) { bindingKey = ""; } if (exchange == null) { - exchange = _defaultExchange; + exchange = defaultExchange; } if (arguments == null) { @@ -245,10 +230,10 @@ public class BindingFactory // The default exchange bindings must reflect the existence of queues, allow // all operations on it to succeed. It is up to the broker to prevent illegal // attempts at binding to this exchange, not the ACLs. - if(exchange != _defaultExchange) + if(exchange != defaultExchange) { // Check access - if (!getVirtualHost().getSecurityManager().authoriseUnbind(exchange, new AMQShortString(bindingKey), queue)) + if (!_virtualHost.getSecurityManager().authoriseUnbind(exchange, new AMQShortString(bindingKey), queue)) { throw new AMQSecurityException("Permission denied: unbinding " + bindingKey); } @@ -265,7 +250,7 @@ public class BindingFactory if (b.isDurable()) { - _configSource.getMessageStore().unbindQueue(exchange, + _virtualHost.getMessageStore().unbindQueue(exchange, new AMQShortString(bindingKey), queue, FieldTable.convertToFieldTable(arguments)); @@ -280,13 +265,15 @@ public class BindingFactory public Binding getBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) { assert queue != null; + final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange(); + if(bindingKey == null) { bindingKey = ""; } if(exchange == null) { - exchange = _defaultExchange; + exchange = defaultExchange; } if(arguments == null) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 558311fc46..5f472b6ddd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.MemoryMessageStoreFactory; import java.util.ArrayList; import java.util.Arrays; @@ -102,14 +103,14 @@ public class VirtualHostConfiguration extends ConfigurationPlugin return getConfig().subset("store"); } - public String getMessageStoreClass() + public String getMessageStoreFactoryClass() { - return getStringValue("store.class", MemoryMessageStore.class.getName()); + return getStringValue("store.factoryclass", MemoryMessageStoreFactory.class.getName()); } - public void setMessageStoreClass(String storeClass) + public void setMessageStoreFactoryClass(String storeFactoryClass) { - getConfig().setProperty("store.class", storeClass); + getConfig().setProperty("store.factoryclass", storeFactoryClass); } public List getExchanges() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index 9159489299..4a58314f51 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -46,11 +46,19 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable /** Close all of the currently open connections. */ public void close() { - _logger.debug("Closing connection registry :" + _registry.size() + " connections."); + close(IConnectionRegistry.BROKER_SHUTDOWN_REPLY_TEXT); + } + + public void close(final String replyText) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Closing connection registry :" + _registry.size() + " connections."); + } while (!_registry.isEmpty()) { AMQConnectionModel connection = _registry.get(0); - closeConnection(connection, AMQConstant.CONNECTION_FORCED, "Broker is shutting down"); + closeConnection(connection, AMQConstant.CONNECTION_FORCED, replyText); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java index 89582e5748..954c448b72 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java @@ -28,12 +28,17 @@ import java.util.List; public interface IConnectionRegistry { + public static final String BROKER_SHUTDOWN_REPLY_TEXT = "Broker is shutting down"; + public static final String VHOST_PASSIVATE_REPLY_TEXT = "Virtual host is being passivated"; + public void initialise(); public void close() throws AMQException; - + + public void close(String replyText) throws AMQException; + public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message); - + public List<AMQConnectionModel> getConnections(); public void registerConnection(AMQConnectionModel connnection); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index cae07046fa..af49168a80 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.server.exchange; -import org.apache.log4j.Logger; - import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.binding.Binding; @@ -125,10 +123,18 @@ public abstract class AbstractExchange implements Exchange, Managable _autoDelete = autoDelete; _ticket = ticket; - // TODO - fix _id = getConfigStore().createId(); getConfigStore().addConfiguredObject(this); + createAndRegisterMBean(); + _logSubject = new ExchangeLogSubject(this, this.getVirtualHost()); + + // Log Exchange creation + CurrentActor.get().message(ExchangeMessages.CREATED(String.valueOf(getTypeShortString()), String.valueOf(name), durable)); + } + + private void createAndRegisterMBean() + { try { _exchangeMbean = createMBean(); @@ -136,12 +142,8 @@ public abstract class AbstractExchange implements Exchange, Managable } catch (JMException e) { - getLogger().error(e); + throw new RuntimeException("Failed to register mbean",e); } - _logSubject = new ExchangeLogSubject(this, this.getVirtualHost()); - - // Log Exchange creation - CurrentActor.get().message(ExchangeMessages.CREATED(String.valueOf(getTypeShortString()), String.valueOf(name), durable)); } public ConfigStore getConfigStore() @@ -149,8 +151,6 @@ public abstract class AbstractExchange implements Exchange, Managable return getVirtualHost().getConfigStore(); } - public abstract Logger getLogger(); - public boolean isDurable() { return _durable; @@ -324,8 +324,7 @@ public abstract class AbstractExchange implements Exchange, Managable public Map<String, Object> getArguments() { - // TODO - Fix - return Collections.EMPTY_MAP; + return Collections.emptyMap(); } public UUID getId() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index ebe0645bc4..33e73b4668 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; - import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; @@ -36,7 +35,7 @@ import java.util.concurrent.ConcurrentMap; public class DefaultExchangeRegistry implements ExchangeRegistry { - private static final Logger _log = Logger.getLogger(DefaultExchangeRegistry.class); + private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class); /** * Maps from exchange name to exchange instance @@ -59,8 +58,6 @@ public class DefaultExchangeRegistry implements ExchangeRegistry new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this, getDurableConfigurationStore()); } - - public DurableConfigurationStore getDurableConfigurationStore() { return _host.getMessageStore(); @@ -153,4 +150,28 @@ public class DefaultExchangeRegistry implements ExchangeRegistry } } + @Override + public void clearAndUnregisterMbeans() + { + for (final AMQShortString exchangeName : getExchangeNames()) + { + final Exchange exchange = getExchange(exchangeName); + + if (exchange instanceof AbstractExchange) + { + AbstractExchange abstractExchange = (AbstractExchange) exchange; + try + { + abstractExchange.getManagedObject().unregister(); + } + catch (AMQException e) + { + LOGGER.warn("Failed to unregister mbean", e); + } + } + } + _exchangeMap.clear(); + _exchangeMapStr.clear(); + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index 8c0a5001db..9525324f57 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -131,12 +131,6 @@ public class DirectExchange extends AbstractExchange return new DirectExchangeMBean(this); } - public Logger getLogger() - { - return _logger; - } - - public List<? extends BaseQueue> doRoute(InboundMessage payload) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java index 4dfcce7bbe..335efaeaa2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java @@ -27,7 +27,8 @@ import org.apache.qpid.server.store.DurableConfigurationStore; public class ExchangeInitialiser { - public void initialise(ExchangeFactory factory, ExchangeRegistry registry, DurableConfigurationStore store) throws AMQException{ + public void initialise(ExchangeFactory factory, ExchangeRegistry registry, DurableConfigurationStore store) throws AMQException + { for (ExchangeType<? extends Exchange> type : factory.getRegisteredTypes()) { define (registry, factory, type.getDefaultExchangeName(), type.getName(), store); @@ -44,7 +45,6 @@ public class ExchangeInitialiser { Exchange exchange = f.createExchange(name, type, true, false, 0); r.registerExchange(exchange); - if(exchange.isDurable()) { store.createExchange(exchange); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java index 18eb37e037..db244c114b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java @@ -51,5 +51,7 @@ public interface ExchangeRegistry Exchange getExchange(String exchangeName); - void unregisterExchange(String exchange, boolean ifUnused) throws ExchangeInUseException, AMQException;; + void unregisterExchange(String exchange, boolean ifUnused) throws ExchangeInUseException, AMQException; + + void clearAndUnregisterMbeans(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index 76f86ea1b4..f9ad2fad87 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -52,11 +52,6 @@ public class FanoutExchange extends AbstractExchange return new FanoutExchangeMBean(this); } - public Logger getLogger() - { - return _logger; - } - public static final ExchangeType<FanoutExchange> TYPE = new ExchangeType<FanoutExchange>() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 295a7191e7..2700a7cda3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -231,11 +231,6 @@ public class HeadersExchange extends AbstractExchange return new HeadersExchangeMBean(this); } - public Logger getLogger() - { - return _logger; - } - protected void onBind(final Binding binding) { String bindingKey = binding.getBindingKey(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 27166e4384..7ce84b7a89 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -407,11 +407,6 @@ public class TopicExchange extends AbstractExchange return new TopicExchangeMBean(this); } - public Logger getLogger() - { - return _logger; - } - private Collection<AMQQueue> getMatchedQueues(InboundMessage message, AMQShortString routingKey) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index 79fcfb6d76..eab28ac9d4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.State; import org.apache.qpid.server.virtualhost.VirtualHost; public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody> @@ -82,6 +83,10 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied: '" + virtualHost.getName() + "'"); } + else if (virtualHost.getState() != State.ACTIVE) + { + throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, "Virtual host '" + virtualHost.getName() + "' is not active"); + } session.setVirtualHost(virtualHost); @@ -89,10 +94,10 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con if (session.getContextKey() == null) { session.setContextKey(generateClientID()); - } + } MethodRegistry methodRegistry = session.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost()); + AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost()); stateManager.changeState(AMQState.CONNECTION_OPEN); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties index 3bc5074777..541f8b8c68 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties @@ -18,8 +18,7 @@ # # Default File used for all non-defined locales. -# 0 - name -CREATED = CFG-1001 : Created : {0} +CREATED = CFG-1001 : Created # 0 - path STORE_LOCATION = CFG-1002 : Store location : {0} CLOSE = CFG-1003 : Closed diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties index a2cedeb22a..081f2bbca3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties @@ -18,11 +18,11 @@ # # Default File used for all non-defined locales. # -# 0 - name -CREATED = MST-1001 : Created : {0} +CREATED = MST-1001 : Created # 0 - path STORE_LOCATION = MST-1002 : Store location : {0} CLOSED = MST-1003 : Closed RECOVERY_START = MST-1004 : Recovery Start RECOVERED = MST-1005 : Recovered {0,number} messages -RECOVERY_COMPLETE = MST-1006 : Recovery Complete
\ No newline at end of file +RECOVERY_COMPLETE = MST-1006 : Recovery Complete +PASSIVATE = MST-1007 : Store Passivated diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties index 9ef58df940..b9e87159a6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties @@ -19,8 +19,7 @@ # Default File used for all non-defined locales. # # -# 0 - name -CREATED = TXN-1001 : Created : {0} +CREATED = TXN-1001 : Created # 0 - path STORE_LOCATION = TXN-1002 : Store location : {0} CLOSED = TXN-1003 : Closed diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java index 8f0b9182a9..6f18cbcc6b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java @@ -40,7 +40,8 @@ public class BindingLogSubject extends AbstractLogSubject public BindingLogSubject(String routingKey, Exchange exchange, AMQQueue queue) { - setLogStringWithFormat(BINDING_FORMAT, queue.getVirtualHost().getName(), + setLogStringWithFormat(BINDING_FORMAT, + queue.getVirtualHost().getName(), exchange.getTypeShortString(), exchange.getNameShortString(), queue.getNameShortString(), diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java index 969288be00..08963bd8f1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.logging.subjects; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.STORE_FORMAT; @@ -28,10 +27,9 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.STORE_FOR public class MessageStoreLogSubject extends AbstractLogSubject { - /** Create an ExchangeLogSubject that Logs in the following format. */ - public MessageStoreLogSubject(VirtualHost vhost, MessageStore store) + /** Create an MessageStoreLogSubject that Logs in the following format. */ + public MessageStoreLogSubject(VirtualHost vhost, String messageStoreName) { - setLogStringWithFormat(STORE_FORMAT, vhost.getName(), - store.getClass().getSimpleName()); + setLogStringWithFormat(STORE_FORMAT, vhost.getName(), messageStoreName); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java index d76487073d..801fe55939 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.server.queue; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.exchange.DefaultExchangeRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; @@ -29,6 +32,8 @@ import java.util.concurrent.ConcurrentMap; public class DefaultQueueRegistry implements QueueRegistry { + private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class); + private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>(); private final VirtualHost _virtualHost; @@ -72,4 +77,22 @@ public class DefaultQueueRegistry implements QueueRegistry { return getQueue(new AMQShortString(queue)); } + + @Override + public void stopAllAndUnregisterMBeans() + { + for (final AMQQueue queue : getQueues()) + { + queue.stop(); + try + { + queue.getManagedObject().unregister(); + } + catch (AMQException e) + { + LOGGER.warn("Failed to unregister mbean", e); + } + } + _queueMap.clear(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java index 80f6bd1493..1ffc0a3560 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java @@ -40,4 +40,6 @@ public interface QueueRegistry Collection<AMQQueue> getQueues(); AMQQueue getQueue(String queue); + + void stopAllAndUnregisterMBeans(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index ea9621ff41..13c24e624e 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -28,7 +28,6 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.federation.Bridge; import org.apache.qpid.server.federation.BrokerLink; -import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.queue.AMQQueue; public interface DurableConfigurationStore @@ -46,13 +45,11 @@ public interface DurableConfigurationStore * @param name The name to be used by this store * @param recoveryHandler Handler to be called as the store recovers on start up * @param config The apache commons configuration object. - * * @throws Exception If any error occurs that means the store is unable to configure itself. */ void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, - Configuration config, - LogSubject logSubject) throws Exception; + Configuration config) throws Exception; /** * Makes the specified exchange persistent. * diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java new file mode 100644 index 0000000000..95b0186027 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +public enum Event +{ + BEFORE_ACTIVATE, + AFTER_ACTIVATE, + BEFORE_PASSIVATE, + AFTER_PASSIVATE, + BEFORE_CLOSE, + AFTER_CLOSE +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventListener.java new file mode 100644 index 0000000000..33ae7b5b24 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventListener.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +public interface EventListener +{ + public void event(Event event); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java new file mode 100644 index 0000000000..3e10f758f9 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class EventManager +{ + private ConcurrentMap<Event, List<EventListener>> _listeners = new ConcurrentHashMap<Event, List<EventListener>>(); + + public void addEventListener(EventListener listener, Event event) + { + _listeners.putIfAbsent(event, new ArrayList<EventListener>()); + final List<EventListener> list = _listeners.get(event); + list.add(listener); + } + + public void notifyEvent(Event event) + { + if (_listeners.containsKey(event)) + { + for (EventListener listener : _listeners.get(event)) + { + listener.event(event); + } + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index b6a5e80640..de9e73f914 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -20,103 +20,58 @@ */ package org.apache.qpid.server.store; -import org.apache.commons.configuration.Configuration; -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.federation.Bridge; -import org.apache.qpid.server.federation.BrokerLink; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.ConfigStoreMessages; -import org.apache.qpid.server.logging.messages.MessageStoreMessages; import org.apache.qpid.server.message.EnqueableMessage; -import org.apache.qpid.server.queue.AMQQueue; -import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -/** A simple message store that stores the messages in a threadsafe structure in memory. */ -public class MemoryMessageStore implements MessageStore +/** A simple message store that stores the messages in a thread-safe structure in memory. */ +public class MemoryMessageStore extends NullMessageStore { - private static final Logger _log = Logger.getLogger(MemoryMessageStore.class); - - private static final int DEFAULT_HASHTABLE_CAPACITY = 50000; - - private static final String HASHTABLE_CAPACITY_CONFIG = "hashtable-capacity"; - - private final AtomicLong _messageId = new AtomicLong(1); - private AtomicBoolean _closed = new AtomicBoolean(false); - private LogSubject _logSubject; + private final AtomicBoolean _closed = new AtomicBoolean(false); private static final Transaction IN_MEMORY_TRANSACTION = new Transaction() { - public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException + @Override + public StoreFuture commitTranAsync() throws AMQStoreException { + return StoreFuture.IMMEDIATE_FUTURE; } - public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException + @Override + public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { } - public void commitTran() throws AMQStoreException + @Override + public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { } - public StoreFuture commitTranAsync() throws AMQStoreException + @Override + public void commitTran() throws AMQStoreException { - return StoreFuture.IMMEDIATE_FUTURE; } + @Override public void abortTran() throws AMQStoreException { } + @Override public void removeXid(long format, byte[] globalId, byte[] branchId) { } + @Override public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) { } - }; - public void configureConfigStore(String name, ConfigurationRecoveryHandler handler, Configuration configuration, LogSubject logSubject) throws Exception - { - _logSubject = logSubject; - CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName())); - - - } - - public void configureMessageStore(String name, - MessageStoreRecoveryHandler recoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler, - Configuration config, LogSubject logSubject) throws Exception - { - if(_logSubject == null) - { - _logSubject = logSubject; - } - int hashtableCapacity = config.getInt(name + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY); - _log.info("Using capacity " + hashtableCapacity + " for hash tables"); - CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName())); - } - - public void close() throws Exception - { - _closed.getAndSet(true); - CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED()); - - } - + @Override public StoredMessage addMessage(StorableMessageMetaData metaData) { final long id = _messageId.getAndIncrement(); @@ -125,96 +80,21 @@ public class MemoryMessageStore implements MessageStore return message; } - - public void createExchange(Exchange exchange) throws AMQStoreException - { - - } - - public void removeExchange(Exchange exchange) throws AMQStoreException - { - - } - - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException - { - - } - - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException - { - - } - - - public void createQueue(AMQQueue queue) throws AMQStoreException - { - // Not requred to do anything - } - - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException - { - // Not required to do anything - } - - public void removeQueue(final AMQQueue queue) throws AMQStoreException - { - // Not required to do anything - } - - public void updateQueue(final AMQQueue queue) throws AMQStoreException - { - // Not required to do anything - } - - public void createBrokerLink(final BrokerLink link) throws AMQStoreException - { - - } - - public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException - { - - } - - public void createBridge(final Bridge bridge) throws AMQStoreException - { - - } - - public void deleteBridge(final Bridge bridge) throws AMQStoreException - { - - } - + @Override public Transaction newTransaction() { return IN_MEMORY_TRANSACTION; } - - public List<AMQQueue> createQueues() throws AMQException - { - return null; - } - - public Long getNewMessageId() - { - return _messageId.getAndIncrement(); - } - + @Override public boolean isPersistent() { return false; } - private void checkNotClosed() throws MessageStoreClosedException - { - if (_closed.get()) - { - throw new MessageStoreClosedException(); - } + @Override + public void close() throws Exception + { + _closed.getAndSet(true); } - - } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java index fc5d2a4e42..0fb7b1f84f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,23 +20,22 @@ package org.apache.qpid.server.store; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.MessageStoreMessages; -import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; -import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.store.decorators.EventDecorator; +import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator; -public abstract class AbstractMessageStore implements MessageStore +public class MemoryMessageStoreFactory implements MessageStoreFactory { - private LogSubject _logSubject; - public void configure(VirtualHost virtualHost) throws Exception + @Override + public MessageStore createMessageStore(LogSubject logSubject) { - _logSubject = new MessageStoreLogSubject(virtualHost, this); - CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName())); + return new OperationalLoggingDecorator(new EventDecorator(new MemoryMessageStore()), logSubject); } - public void close() throws Exception + @Override + public String getStoreClassName() { - CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED()); + return MemoryMessageStore.class.getSimpleName(); } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index 2114472592..c76d4f223f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.store; import org.apache.commons.configuration.Configuration; -import org.apache.qpid.server.logging.LogSubject; /** * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages. @@ -42,9 +41,9 @@ public interface MessageStore extends DurableConfigurationStore void configureMessageStore(String name, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler, - Configuration config, LogSubject logSubject) throws Exception; - + Configuration config) throws Exception; + void activate() throws Exception; public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData); @@ -65,4 +64,8 @@ public interface MessageStore extends DurableConfigurationStore */ void close() throws Exception; + void addEventListener(EventListener eventListener, Event event); + + MessageStore getUnderlyingStore(); + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java new file mode 100644 index 0000000000..aba7456a44 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +public class MessageStoreConstants +{ + + public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path"; + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java new file mode 100644 index 0000000000..878798eac3 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.logging.LogSubject; + +public interface MessageStoreFactory +{ + MessageStore createMessageStore(LogSubject logSubject); + + String getStoreClassName(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java new file mode 100644 index 0000000000..0b55f74730 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.federation.Bridge; +import org.apache.qpid.server.federation.BrokerLink; +import org.apache.qpid.server.queue.AMQQueue; + +public class NullMessageStore implements MessageStore +{ + @Override + public void configureConfigStore(String name, + ConfigurationRecoveryHandler recoveryHandler, + Configuration config) throws Exception + { + } + + @Override + public void createExchange(Exchange exchange) throws AMQStoreException + { + } + + @Override + public void removeExchange(Exchange exchange) throws AMQStoreException + { + } + + @Override + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException + { + } + + @Override + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException + { + } + + @Override + public void createQueue(AMQQueue queue) throws AMQStoreException + { + } + + @Override + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException + { + } + + @Override + public void removeQueue(AMQQueue queue) throws AMQStoreException + { + } + + @Override + public void updateQueue(AMQQueue queue) throws AMQStoreException + { + } + + @Override + public void createBrokerLink(final BrokerLink link) throws AMQStoreException + { + } + + @Override + public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException + { + } + + @Override + public void createBridge(final Bridge bridge) throws AMQStoreException + { + } + + @Override + public void deleteBridge(final Bridge bridge) throws AMQStoreException + { + } + + @Override + public void configureMessageStore(String name, + MessageStoreRecoveryHandler recoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception + { + } + + @Override + public void close() throws Exception + { + } + + @Override + public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) + { + return null; + } + + @Override + public boolean isPersistent() + { + return false; + } + + @Override + public Transaction newTransaction() + { + return null; + } + + @Override + public void activate() throws Exception + { + } + + @Override + public void addEventListener(EventListener eventListener, Event event) + { + } + + @Override + public MessageStore getUnderlyingStore() + { + return this; + } +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java new file mode 100644 index 0000000000..7928f613d9 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +public enum State +{ + INITIAL, + CONFIGURING, + RECOVERING, + ACTIVE, + CLOSING, + CLOSED +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java new file mode 100644 index 0000000000..41b3cb81bb --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + + +public class StateManager +{ + private State _state = State.INITIAL; + + public synchronized State getState() + { + return _state; + } + + public synchronized void stateTransition(final State current, final State desired) + { + if (_state != current) + { + throw new IllegalStateException("Cannot transition to the state: " + desired + "; need to be in state: " + current + + "; currently in state: " + _state); + } + _state = desired; + } + + public synchronized boolean isInState(State testedState) + { + return _state.equals(testedState); + } + + public synchronized boolean isNotInState(State testedState) + { + return !isInState(testedState); + } + + public synchronized void checkInState(State checkedState) + { + if (isNotInState(checkedState)) + { + throw new IllegalStateException("Unexpected state. Was : " + _state + " but expected : " + checkedState); + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java index 3e720d9de1..7d3bf90a75 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java @@ -31,11 +31,10 @@ public interface StoreFuture public void waitForCompletion() { - } }; boolean isComplete(); void waitForCompletion(); -}
\ No newline at end of file +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java new file mode 100644 index 0000000000..a402e6ee5c --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.decorators; + +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.federation.Bridge; +import org.apache.qpid.server.federation.BrokerLink; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.TransactionLogRecoveryHandler; + +/** + * AbstractDecorator. All methods <bMUST</b> perform simple + * delegation to their equivalent decorated counterpart without + * change. + */ +public class AbstractDecorator implements MessageStore +{ + protected final MessageStore _decoratedStore; + + public AbstractDecorator(MessageStore store) + { + _decoratedStore = store; + } + + @Override + public void configureMessageStore(String name, + MessageStoreRecoveryHandler messageRecoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler, + Configuration config) throws Exception + { + _decoratedStore.configureMessageStore(name, messageRecoveryHandler, + tlogRecoveryHandler, config); + } + + @Override + public void configureConfigStore(String name, + ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception + { + _decoratedStore.configureConfigStore(name, recoveryHandler, config); + } + + @Override + public void activate() throws Exception + { + _decoratedStore.activate(); + } + + @Override + public void close() throws Exception + { + _decoratedStore.close(); + } + + @Override + public <T extends StorableMessageMetaData> StoredMessage<T> addMessage( + T metaData) + { + return _decoratedStore.addMessage(metaData); + } + + @Override + public void createExchange(Exchange exchange) throws AMQStoreException + { + _decoratedStore.createExchange(exchange); + } + + @Override + public boolean isPersistent() + { + return _decoratedStore.isPersistent(); + } + + @Override + public Transaction newTransaction() + { + return _decoratedStore.newTransaction(); + } + + @Override + public void removeExchange(Exchange exchange) throws AMQStoreException + { + _decoratedStore.removeExchange(exchange); + } + + @Override + public void addEventListener(EventListener eventListener, Event event) + { + _decoratedStore.addEventListener(eventListener, event); + } + + @Override + public void bindQueue(Exchange exchange, AMQShortString routingKey, + AMQQueue queue, FieldTable args) throws AMQStoreException + { + _decoratedStore.bindQueue(exchange, routingKey, queue, args); + } + + @Override + public void unbindQueue(Exchange exchange, AMQShortString routingKey, + AMQQueue queue, FieldTable args) throws AMQStoreException + { + _decoratedStore.unbindQueue(exchange, routingKey, queue, args); + } + + @Override + public void createQueue(AMQQueue queue) throws AMQStoreException + { + _decoratedStore.createQueue(queue); + } + + @Override + public void createQueue(AMQQueue queue, FieldTable arguments) + throws AMQStoreException + { + _decoratedStore.createQueue(queue, arguments); + } + + @Override + public void removeQueue(AMQQueue queue) throws AMQStoreException + { + _decoratedStore.removeQueue(queue); + } + + @Override + public void updateQueue(AMQQueue queue) throws AMQStoreException + { + _decoratedStore.updateQueue(queue); + } + + @Override + public void createBrokerLink(BrokerLink link) throws AMQStoreException + { + _decoratedStore.createBrokerLink(link); + } + + @Override + public void deleteBrokerLink(BrokerLink link) throws AMQStoreException + { + _decoratedStore.deleteBrokerLink(link); + } + + @Override + public void createBridge(Bridge bridge) throws AMQStoreException + { + _decoratedStore.createBridge(bridge); + } + + @Override + public void deleteBridge(Bridge bridge) throws AMQStoreException + { + _decoratedStore.deleteBridge(bridge); + } + + @Override + public MessageStore getUnderlyingStore() + { + return _decoratedStore.getUnderlyingStore(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java new file mode 100644 index 0000000000..dbf179c2e6 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.decorators; + +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.EventManager; +import org.apache.qpid.server.store.MessageStore; + +public class EventDecorator extends AbstractDecorator +{ + protected final EventManager _eventManager; + + public EventDecorator(MessageStore store) + { + super(store); + _eventManager = new EventManager(); + } + + @Override + public void activate() throws Exception + { + _eventManager.notifyEvent(Event.BEFORE_ACTIVATE); + _decoratedStore.activate(); + _eventManager.notifyEvent(Event.AFTER_ACTIVATE); + } + + @Override + public void close() throws Exception + { + _eventManager.notifyEvent(Event.BEFORE_CLOSE); + _decoratedStore.close(); + _eventManager.notifyEvent(Event.AFTER_CLOSE); + } + + @Override + public void addEventListener(EventListener eventListener, Event event) + { + _eventManager.addEventListener(eventListener, event); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java new file mode 100644 index 0000000000..81d9645c01 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.decorators; + +import static org.apache.qpid.server.store.MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY; + +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConfigStoreMessages; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.logging.messages.TransactionLogMessages; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler; +import org.apache.qpid.server.store.TransactionLogRecoveryHandler; + +public class OperationalLoggingDecorator extends AbstractDecorator +{ + protected final LogSubject _logSubject; + + public OperationalLoggingDecorator(final MessageStore decoratedStore, LogSubject logSubject) + { + super(decoratedStore); + _logSubject = logSubject; + } + + @Override + public void configureMessageStore(String name, + MessageStoreRecoveryHandler messageRecoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler, + Configuration config) throws Exception + { + CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED()); + CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED()); + + if (config != null && config.getString(ENVIRONMENT_PATH_PROPERTY) != null) + { + CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(config.getString(ENVIRONMENT_PATH_PROPERTY))); + } + + _decoratedStore.configureMessageStore(name, messageRecoveryHandler, + tlogRecoveryHandler, config); + } + + @Override + public void configureConfigStore(String name, + ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception + { + CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED()); + + _decoratedStore.configureConfigStore(name, recoveryHandler, config); + } + + @Override + public void activate() throws Exception + { + CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_START()); + _decoratedStore.activate(); + CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE()); + } + + @Override + public void close() throws Exception + { + CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED()); + _decoratedStore.close(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 86304a0984..a3d1a7999d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -18,7 +18,8 @@ * under the License. * */ -package org.apache.qpid.server.store; +package org.apache.qpid.server.store.derby; + import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; @@ -30,13 +31,24 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.federation.Bridge; import org.apache.qpid.server.federation.BrokerLink; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.ConfigStoreMessages; -import org.apache.qpid.server.logging.messages.MessageStoreMessages; -import org.apache.qpid.server.logging.messages.TransactionLogMessages; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.MessageMetaDataType; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreConstants; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler; +import org.apache.qpid.server.store.State; +import org.apache.qpid.server.store.StateManager; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMemoryMessage; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.TransactionLogRecoveryHandler; +import org.apache.qpid.server.store.TransactionLogResource; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -64,9 +76,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** - * An implementation of a {@link MessageStore} that uses Apache Derby as the persistance + * An implementation of a {@link MessageStore} that uses Apache Derby as the persistence * mechanism. - * + * * TODO extract the SQL statements into a generic JDBC store */ public class DerbyMessageStore implements MessageStore @@ -74,9 +86,6 @@ public class DerbyMessageStore implements MessageStore private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); - public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path"; - - private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION"; @@ -94,7 +103,7 @@ public class DerbyMessageStore implements MessageStore private static final String XID_TABLE_NAME = "QPID_XIDS"; private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS"; - + private static final int DB_VERSION = 3; @@ -157,7 +166,7 @@ public class DerbyMessageStore implements MessageStore + " arguments blob, PRIMARY KEY ( id_lsb, id_msb ))"; private static final String SELECT_FROM_LINKS = "SELECT create_time, arguments FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and id_msb"; - private static final String DELETE_FROM_LINKS = "DELETE FROM " + LINKS_TABLE_NAME + private static final String DELETE_FROM_LINKS = "DELETE FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and id_msb = ?"; private static final String SELECT_ALL_FROM_LINKS = "SELECT id_lsb, id_msb, create_time, " + "arguments FROM " + LINKS_TABLE_NAME; @@ -175,12 +184,12 @@ public class DerbyMessageStore implements MessageStore + " link_id_msb bigint not null," + " arguments blob, PRIMARY KEY ( id_lsb, id_msb ))"; private static final String SELECT_FROM_BRIDGES = - "SELECT create_time, link_id_lsb, link_id_msb, arguments FROM " + "SELECT create_time, link_id_lsb, link_id_msb, arguments FROM " + BRIDGES_TABLE_NAME + " WHERE id_lsb = ? and id_msb = ?"; - private static final String DELETE_FROM_BRIDGES = "DELETE FROM " + BRIDGES_TABLE_NAME + private static final String DELETE_FROM_BRIDGES = "DELETE FROM " + BRIDGES_TABLE_NAME + " WHERE id_lsb = ? and id_msb = ?"; - private static final String SELECT_ALL_FROM_BRIDGES = "SELECT id_lsb, id_msb, " - + " create_time," + private static final String SELECT_ALL_FROM_BRIDGES = "SELECT id_lsb, id_msb, " + + " create_time," + " link_id_lsb, link_id_msb, " + "arguments FROM " + BRIDGES_TABLE_NAME + " WHERE link_id_lsb = ? and link_id_msb = ?"; @@ -196,7 +205,7 @@ public class DerbyMessageStore implements MessageStore "CREATE TABLE "+XID_TABLE_NAME+" ( format bigint not null," + " global_id varchar(64) for bit data, branch_id varchar(64) for bit data, PRIMARY KEY ( format, " + "global_id, branch_id ))"; - private static final String INSERT_INTO_XIDS = + private static final String INSERT_INTO_XIDS = "INSERT INTO "+XID_TABLE_NAME+" ( format, global_id, branch_id ) values (?, ?, ?)"; private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME + " WHERE format = ? and global_id = ? and branch_id = ?"; @@ -214,104 +223,64 @@ public class DerbyMessageStore implements MessageStore "queue_name, message_id ) values (?,?,?,?,?,?) "; private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME + " WHERE format = ? and global_id = ? and branch_id = ?"; - private static final String SELECT_ALL_FROM_XID_ACTIONS = - "SELECT action_type, queue_name, message_id FROM " + XID_ACTIONS_TABLE_NAME + + private static final String SELECT_ALL_FROM_XID_ACTIONS = + "SELECT action_type, queue_name, message_id FROM " + XID_ACTIONS_TABLE_NAME + " WHERE format = ? and global_id = ? and branch_id = ?"; private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; + private final StateManager _stateManager = new StateManager(); - private LogSubject _logSubject; - private boolean _configured; - - - private static final class CommitStoreFuture implements StoreFuture - { - public boolean isComplete() - { - return true; - } - - public void waitForCompletion() - { - - } - } + private MessageStoreRecoveryHandler _messageRecoveryHandler; - private enum State - { - INITIAL, - CONFIGURING, - RECOVERING, - STARTED, - CLOSING, - CLOSED - } - - private State _state = State.INITIAL; + private TransactionLogRecoveryHandler _tlogRecoveryHandler; + private ConfigurationRecoveryHandler _configRecoveryHandler; + @Override public void configureConfigStore(String name, - ConfigurationRecoveryHandler recoveryHandler, - Configuration storeConfiguration, - LogSubject logSubject) throws Exception + ConfigurationRecoveryHandler configRecoveryHandler, + Configuration storeConfiguration) throws Exception { - stateTransition(State.INITIAL, State.CONFIGURING); - _logSubject = logSubject; - CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName())); + _stateManager.stateTransition(State.INITIAL, State.CONFIGURING); + _configRecoveryHandler = configRecoveryHandler; - if(!_configured) - { - commonConfiguration(name, storeConfiguration, logSubject); - _configured = true; - } - - // this recovers durable exchanges, queues, and bindings - recover(recoveryHandler); - - - stateTransition(State.RECOVERING, State.STARTED); + commonConfiguration(name, storeConfiguration); } + @Override public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler, - Configuration storeConfiguration, LogSubject logSubject) throws Exception + Configuration storeConfiguration) throws Exception { - if(!_configured) - { - - _logSubject = logSubject; - } - - CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName())); - - if(!_configured) - { - - commonConfiguration(name, storeConfiguration, logSubject); - _configured = true; - } - - recoverMessages(recoveryHandler); + _tlogRecoveryHandler = tlogRecoveryHandler; + _messageRecoveryHandler = recoveryHandler; + } - CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName())); + @Override + public void activate() throws Exception + { + _stateManager.stateTransition(State.CONFIGURING, State.RECOVERING); - TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(tlogRecoveryHandler); + // this recovers durable exchanges, queues, and bindings + recoverConfiguration(_configRecoveryHandler); + recoverMessages(_messageRecoveryHandler); + TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(_tlogRecoveryHandler); recoverXids(dtxrh); - + _stateManager.stateTransition(State.RECOVERING, State.ACTIVE); } - private void commonConfiguration(String name, Configuration storeConfiguration, LogSubject logSubject) + private void commonConfiguration(String name, Configuration storeConfiguration) throws ClassNotFoundException, SQLException { initialiseDriver(); //Update to pick up QPID_WORK and use that as the default location not just derbyDB - final String databasePath = storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK") + final String databasePath = storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK") + File.separator + "derbyDB"); File environmentPath = new File(databasePath); @@ -324,8 +293,6 @@ public class DerbyMessageStore implements MessageStore } } - CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(environmentPath.getAbsolutePath())); - createOrOpenDatabase(name, databasePath); } @@ -576,19 +543,15 @@ public class DerbyMessageStore implements MessageStore { stmt.close(); } - } - public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQException + private void recoverConfiguration(ConfigurationRecoveryHandler recoveryHandler) throws AMQException { - stateTransition(State.CONFIGURING, State.RECOVERING); - - CurrentActor.get().message(_logSubject,MessageStoreMessages.RECOVERY_START()); try { ConfigurationRecoveryHandler.QueueRecoveryHandler qrh = recoveryHandler.begin(this); - loadQueues(qrh); + recoverQueues(qrh); ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh = qrh.completeQueueRecovery(); List<String> exchanges = loadExchanges(erh); @@ -632,12 +595,12 @@ public class DerbyMessageStore implements MessageStore Blob argumentsAsBlob = rs.getBlob(4); byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length()); - + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes)); int size = dis.readInt(); - + Map<String,String> arguments = new HashMap<String, String>(); - + for(int i = 0; i < size; i++) { arguments.put(dis.readUTF(), dis.readUTF()); @@ -744,7 +707,7 @@ public class DerbyMessageStore implements MessageStore } - private void loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException + private void recoverQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException { Connection conn = newAutoCommitConnection(); try @@ -759,6 +722,7 @@ public class DerbyMessageStore implements MessageStore while(rs.next()) { String queueName = rs.getString(1); + _logger.debug("Got queue " + queueName); String owner = rs.getString(2); boolean exclusive = rs.getBoolean(3); Blob argumentsAsBlob = rs.getBlob(4); @@ -913,10 +877,11 @@ public class DerbyMessageStore implements MessageStore + @Override public void close() throws Exception { - CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED()); _closed.getAndSet(true); + _stateManager.stateTransition(State.ACTIVE, State.CLOSING); try { @@ -926,9 +891,9 @@ public class DerbyMessageStore implements MessageStore _logger.error("Unable to shut down the store"); } catch (SQLException e) - { - if (e.getSQLState().equalsIgnoreCase(DERBY_SINGLE_DB_SHUTDOWN_CODE)) - { + { + if (e.getSQLState().equalsIgnoreCase(DERBY_SINGLE_DB_SHUTDOWN_CODE)) + { //expected and represents a clean shutdown of this database only, do nothing. } else @@ -936,8 +901,11 @@ public class DerbyMessageStore implements MessageStore _logger.error("Exception whilst shutting down the store: " + e); } } + + _stateManager.stateTransition(State.CLOSING, State.CLOSED); } + @Override public StoredMessage addMessage(StorableMessageMetaData metaData) { if(metaData.isPersistent()) @@ -1015,9 +983,10 @@ public class DerbyMessageStore implements MessageStore } + @Override public void createExchange(Exchange exchange) throws AMQStoreException { - if (_state != State.RECOVERING) + if (_stateManager.isInState(State.ACTIVE)) { try { @@ -1077,6 +1046,7 @@ public class DerbyMessageStore implements MessageStore } + @Override public void removeExchange(Exchange exchange) throws AMQStoreException { @@ -1112,10 +1082,11 @@ public class DerbyMessageStore implements MessageStore } } + @Override public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { - if (_state != State.RECOVERING) + if (_stateManager.isInState(State.ACTIVE)) { try { @@ -1189,6 +1160,7 @@ public class DerbyMessageStore implements MessageStore } + @Override public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { @@ -1224,16 +1196,18 @@ public class DerbyMessageStore implements MessageStore } } + @Override public void createQueue(AMQQueue queue) throws AMQStoreException { createQueue(queue, null); } + @Override public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException { _logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called"); - if (_state != State.RECOVERING) + if (_stateManager.isInState(State.ACTIVE)) { try { @@ -1299,7 +1273,7 @@ public class DerbyMessageStore implements MessageStore } } } - + /** * Updates the specified queue in the persistent store, IF it is already present. If the queue * is not present in the store, it will not be added. @@ -1309,9 +1283,10 @@ public class DerbyMessageStore implements MessageStore * @param queue The queue to update the entry for. * @throws AMQStoreException If the operation fails for any reason. */ + @Override public void updateQueue(final AMQQueue queue) throws AMQStoreException { - if (_state != State.RECOVERING) + if (_stateManager.isInState(State.ACTIVE)) { try { @@ -1363,7 +1338,7 @@ public class DerbyMessageStore implements MessageStore throw new AMQStoreException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e); } } - + } /** @@ -1389,7 +1364,7 @@ public class DerbyMessageStore implements MessageStore throw sqlEx; } } - + return connection; } @@ -1419,6 +1394,7 @@ public class DerbyMessageStore implements MessageStore return connection; } + @Override public void removeQueue(final AMQQueue queue) throws AMQStoreException { AMQShortString name = queue.getNameShortString(); @@ -1450,11 +1426,12 @@ public class DerbyMessageStore implements MessageStore } + @Override public void createBrokerLink(final BrokerLink link) throws AMQStoreException { _logger.debug("public void createBrokerLink(BrokerLink = " + link + "): called"); - if (_state != State.RECOVERING) + if (_stateManager.isInState(State.ACTIVE)) { try { @@ -1463,7 +1440,7 @@ public class DerbyMessageStore implements MessageStore PreparedStatement stmt = conn.prepareStatement(FIND_LINK); try { - + stmt.setLong(1, link.getId().getLeastSignificantBits()); stmt.setLong(2, link.getId().getMostSignificantBits()); ResultSet rs = stmt.executeQuery(); @@ -1477,7 +1454,7 @@ public class DerbyMessageStore implements MessageStore try { - + insertStmt.setLong(1, link.getId().getLeastSignificantBits()); insertStmt.setLong(2, link.getId().getMostSignificantBits()); insertStmt.setLong(3, link.getCreateTime()); @@ -1546,6 +1523,7 @@ public class DerbyMessageStore implements MessageStore return argumentBytes; } + @Override public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException { _logger.debug("public void deleteBrokerLink( " + link + "): called"); @@ -1577,11 +1555,12 @@ public class DerbyMessageStore implements MessageStore } + @Override public void createBridge(final Bridge bridge) throws AMQStoreException { _logger.debug("public void createBridge(BrokerLink = " + bridge + "): called"); - if (_state != State.RECOVERING) + if (_stateManager.isInState(State.ACTIVE)) { try { @@ -1647,6 +1626,7 @@ public class DerbyMessageStore implements MessageStore } } + @Override public void deleteBridge(final Bridge bridge) throws AMQStoreException { _logger.debug("public void deleteBridge( " + bridge + "): called"); @@ -1677,6 +1657,7 @@ public class DerbyMessageStore implements MessageStore } + @Override public Transaction newTransaction() { return new DerbyTransaction(); @@ -1695,7 +1676,7 @@ public class DerbyMessageStore implements MessageStore { _logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]"); } - + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY); try { @@ -1834,7 +1815,7 @@ public class DerbyMessageStore implements MessageStore { stmt.close(); } - + stmt = conn.prepareStatement(INSERT_INTO_XID_ACTIONS); try @@ -1879,7 +1860,7 @@ public class DerbyMessageStore implements MessageStore } } - + private static final class ConnectionWrapper { private final Connection _connection; @@ -1924,7 +1905,7 @@ public class DerbyMessageStore implements MessageStore public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws AMQStoreException { commitTran(connWrapper); - return new CommitStoreFuture(); + return StoreFuture.IMMEDIATE_FUTURE; } public void abortTran(ConnectionWrapper connWrapper) throws AMQStoreException @@ -1965,7 +1946,7 @@ public class DerbyMessageStore implements MessageStore { _logger.debug("Adding metadata for message " +messageId); } - + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_META_DATA); try { @@ -2008,7 +1989,7 @@ public class DerbyMessageStore implements MessageStore { stmt.close(); } - + } @@ -2154,31 +2135,37 @@ public class DerbyMessageStore implements MessageStore _messageNumber = messageNumber; } + @Override public TransactionLogResource getQueue() { return this; } + @Override public EnqueableMessage getMessage() { return this; } + @Override public long getMessageNumber() { return _messageNumber; } + @Override public boolean isPersistent() { return true; } + @Override public StoredMessage getStoredMessage() { throw new UnsupportedOperationException(); } + @Override public String getResourceName() { return _queueName; @@ -2191,7 +2178,7 @@ public class DerbyMessageStore implements MessageStore try { List<Xid> xids = new ArrayList<Xid>(); - + Statement stmt = conn.createStatement(); try { @@ -2217,15 +2204,15 @@ public class DerbyMessageStore implements MessageStore stmt.close(); } - - + + for(Xid xid : xids) { List<RecordImpl> enqueues = new ArrayList<RecordImpl>(); List<RecordImpl> dequeues = new ArrayList<RecordImpl>(); - + PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS); - + try { pstmt.setLong(1, xid.getFormat()); @@ -2256,13 +2243,13 @@ public class DerbyMessageStore implements MessageStore { pstmt.close(); } - - dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), - enqueues.toArray(new RecordImpl[enqueues.size()]), + + dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), + enqueues.toArray(new RecordImpl[enqueues.size()]), dequeues.toArray(new RecordImpl[dequeues.size()])); } - - + + dtxrh.completeDtxRecordRecovery(); } finally @@ -2271,7 +2258,7 @@ public class DerbyMessageStore implements MessageStore } } - + StorableMessageMetaData getMetaData(long messageId) throws SQLException { @@ -2417,24 +2404,13 @@ public class DerbyMessageStore implements MessageStore } + @Override public boolean isPersistent() { return true; } - private synchronized void stateTransition(State requiredState, State newState) throws AMQStoreException - { - if (_state != requiredState) - { - throw new AMQStoreException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState - + "; currently in state: " + _state); - } - - _state = newState; - } - - private class DerbyTransaction implements Transaction { private final ConnectionWrapper _connWrapper; @@ -2452,6 +2428,7 @@ public class DerbyMessageStore implements MessageStore } } + @Override public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { if(message.getStoredMessage() instanceof StoredDerbyMessage) @@ -2469,32 +2446,38 @@ public class DerbyMessageStore implements MessageStore DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber()); } + @Override public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber()); } + @Override public void commitTran() throws AMQStoreException { DerbyMessageStore.this.commitTran(_connWrapper); } + @Override public StoreFuture commitTranAsync() throws AMQStoreException { return DerbyMessageStore.this.commitTranAsync(_connWrapper); } + @Override public void abortTran() throws AMQStoreException { DerbyMessageStore.this.abortTran(_connWrapper); } + @Override public void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException { DerbyMessageStore.this.removeXid(_connWrapper, format, globalId, branchId); } + @Override public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) throws AMQStoreException { @@ -2512,7 +2495,7 @@ public class DerbyMessageStore implements MessageStore private volatile SoftReference<StorableMessageMetaData> _metaDataRef; private byte[] _data; private volatile SoftReference<byte[]> _dataRef; - + StoredDerbyMessage(long messageId, StorableMessageMetaData metaData) { @@ -2524,15 +2507,16 @@ public class DerbyMessageStore implements MessageStore StorableMessageMetaData metaData, boolean persist) { _messageId = messageId; - + _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); if(persist) { - _metaData = metaData; + _metaData = metaData; } } + @Override public StorableMessageMetaData getMetaData() { StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData; @@ -2552,11 +2536,13 @@ public class DerbyMessageStore implements MessageStore return metaData; } + @Override public long getMessageNumber() { return _messageId; } + @Override public void addContent(int offsetInMessage, java.nio.ByteBuffer src) { src = src.slice(); @@ -2576,9 +2562,10 @@ public class DerbyMessageStore implements MessageStore System.arraycopy(oldData,0,_data,0,oldData.length); src.duplicate().get(_data, oldData.length, src.remaining()); } - + } + @Override public int getContent(int offsetInMessage, java.nio.ByteBuffer dst) { byte[] data = _dataRef == null ? null : _dataRef.get(); @@ -2595,6 +2582,7 @@ public class DerbyMessageStore implements MessageStore } + @Override public ByteBuffer getContent(int offsetInMessage, int size) { ByteBuffer buf = ByteBuffer.allocate(size); @@ -2603,6 +2591,7 @@ public class DerbyMessageStore implements MessageStore return buf; } + @Override public synchronized StoreFuture flushToStore() { try @@ -2612,7 +2601,7 @@ public class DerbyMessageStore implements MessageStore Connection conn = newConnection(); store(conn); - + conn.commit(); conn.close(); } @@ -2651,6 +2640,7 @@ public class DerbyMessageStore implements MessageStore } } + @Override public void remove() { DerbyMessageStore.this.removeMessage(_messageId); @@ -2687,4 +2677,21 @@ public class DerbyMessageStore implements MessageStore } } + @Override + public void addEventListener(EventListener eventListener, Event event) + { + throw new UnsupportedOperationException(); + } + + @Override + public MessageStore getUnderlyingStore() + { + return this; + } + + public String getDatabaseProviderName() + { + return DerbyMessageStore.class.getName(); + } + }
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java new file mode 100644 index 0000000000..02b59dfc06 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.derby; + +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreFactory; +import org.apache.qpid.server.store.decorators.EventDecorator; +import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator; + +public class DerbyMessageStoreFactory implements MessageStoreFactory +{ + + @Override + public MessageStore createMessageStore(LogSubject logSubject) + { + return new OperationalLoggingDecorator(new EventDecorator(new DerbyMessageStore()), logSubject); + } + + @Override + public String getStoreClassName() + { + return DerbyMessageStore.class.getSimpleName(); + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index 28d8cb2ec7..5460c89eab 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.AuthenticationResult; import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; import org.apache.qpid.server.subscription.Subscription_0_10; +import org.apache.qpid.server.virtualhost.State; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; import org.slf4j.Logger; @@ -177,6 +178,11 @@ public class ServerConnectionDelegate extends ServerDelegate sconn.setState(Connection.State.CLOSING); sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Permission denied '"+vhostName+"'")); } + else if (vhost.getState() != State.ACTIVE) + { + sconn.setState(Connection.State.CLOSING); + sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Virtual host '"+vhostName+"' is not active")); + } else { sconn.setState(Connection.State.OPEN); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java new file mode 100644 index 0000000000..fb50b3e289 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +public enum State +{ + INITIALISING, + ACTIVE, + PASSIVE, + STOPPED +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 00c8d1ff27..2ef110641e 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -97,4 +97,6 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo void removeBrokerConnection(BrokerLink brokerLink); ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask); + + State getState(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 1da5b8d0c7..1db26a5f04 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.Map; import java.util.TreeMap; import java.util.UUID; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; @@ -92,7 +93,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa public VirtualHostConfigRecoveryHandler begin(MessageStore store) { - _logSubject = new MessageStoreLogSubject(_virtualHost,store); + _logSubject = new MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName()); _store = store; CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false)); @@ -355,31 +356,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); } - private static final class ProcessAction - { - private final AMQQueue _queue; - private final AMQMessage _message; - - public ProcessAction(AMQQueue queue, AMQMessage message) - { - _queue = queue; - _message = message; - } - - public void process() - { - try - { - _queue.enqueue(_message); - } - catch(AMQException e) - { - throw new RuntimeException(e); - } - } - - } - public void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf) { try diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index eccaf553cd..530be46d70 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -20,15 +20,11 @@ */ package org.apache.qpid.server.virtualhost; -import java.util.concurrent.ScheduledFuture; -import org.apache.commons.configuration.Configuration; + import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; - import org.apache.qpid.AMQException; -import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQBrokerManagerMBean; import org.apache.qpid.server.binding.BindingFactory; import org.apache.qpid.server.configuration.BrokerConfig; @@ -45,9 +41,7 @@ import org.apache.qpid.server.exchange.DefaultExchangeRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.federation.Bridge; import org.apache.qpid.server.federation.BrokerLink; -import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; @@ -62,26 +56,28 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsCounter; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler; -import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.Transaction; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler; +import org.apache.qpid.server.store.MessageStoreFactory; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; +import javax.management.JMException; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; + import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; + public class VirtualHostImpl implements VirtualHost { private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class); @@ -104,29 +100,31 @@ public class VirtualHostImpl implements VirtualHost private final BrokerConfig _brokerConfig; - private final VirtualHostConfiguration _configuration; + private final VirtualHostConfiguration _vhostConfig; - private ConnectionRegistry _connectionRegistry; + private final VirtualHostMBean _virtualHostMBean; - private QueueRegistry _queueRegistry; + private final AMQBrokerManagerMBean _brokerMBean; - private ExchangeRegistry _exchangeRegistry; + private final QueueRegistry _queueRegistry; - private ExchangeFactory _exchangeFactory; + private final ExchangeRegistry _exchangeRegistry; - private MessageStore _messageStore; + private final ExchangeFactory _exchangeFactory; - private DtxRegistry _dtxRegistry; + private final ConnectionRegistry _connectionRegistry; - private VirtualHostMBean _virtualHostMBean; + private final BindingFactory _bindingFactory; - private AMQBrokerManagerMBean _brokerMBean; + private final DtxRegistry _dtxRegistry; - private BindingFactory _bindingFactory; + private final MessageStore _messageStore; + + private State _state = State.INITIALISING; private boolean _statisticsEnabled = false; - private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception { @@ -135,53 +133,45 @@ public class VirtualHostImpl implements VirtualHost throw new IllegalArgumentException("HostConfig cannot be null"); } + if (hostConfig.getName() == null || hostConfig.getName().length() == 0) + { + throw new IllegalArgumentException("Illegal name (" + hostConfig.getName() + ") for virtualhost."); + } + _appRegistry = appRegistry; _brokerConfig = _appRegistry.getBroker(); - _configuration = hostConfig; - _name = _configuration.getName(); + _vhostConfig = hostConfig; + _name = _vhostConfig.getName(); _dtxRegistry = new DtxRegistry(); _id = _appRegistry.getConfigStore().createId(); CurrentActor.get().message(VirtualHostMessages.CREATED(_name)); - if (_name == null || _name.length() == 0) - { - throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost."); - } - - _securityManager = new SecurityManager(_appRegistry.getSecurityManager()); - _securityManager.configureHostPlugins(_configuration); - _virtualHostMBean = new VirtualHostMBean(); + _securityManager = new SecurityManager(_appRegistry.getSecurityManager()); + _securityManager.configureHostPlugins(_vhostConfig); _connectionRegistry = new ConnectionRegistry(); - _houseKeepingTasks = new ScheduledThreadPoolExecutor(_configuration.getHouseKeepingThreadCount()); + _houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount()); _queueRegistry = new DefaultQueueRegistry(this); _exchangeFactory = new DefaultExchangeFactory(this); - _exchangeFactory.initialise(_configuration); + _exchangeFactory.initialise(_vhostConfig); _exchangeRegistry = new DefaultExchangeRegistry(this); - StartupRoutingTable configFileRT = new StartupRoutingTable(); - - _messageStore = configFileRT; - - // This needs to be after the RT has been defined as it creates the default durable exchanges. - _exchangeRegistry.initialise(); - _bindingFactory = new BindingFactory(this); - initialiseModel(_configuration); + _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); - initialiseMessageStore(hostConfig); + _messageStore = initialiseMessageStore(hostConfig.getMessageStoreFactoryClass()); - _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); - _brokerMBean.register(); - initialiseHouseKeeping(hostConfig.getHousekeepingCheckPeriod()); + configureMessageStore(hostConfig); + + activateNonHAMessageStore(); initialiseStatistics(); } @@ -193,7 +183,7 @@ public class VirtualHostImpl implements VirtualHost public VirtualHostConfiguration getConfiguration() { - return _configuration; + return _vhostConfig; } public UUID getId() @@ -217,47 +207,16 @@ public class VirtualHostImpl implements VirtualHost } /** - * Virtual host JMX MBean class. - * - * This has some of the methods implemented from management interface for exchanges. Any - * Implementation of an Exchange MBean should extend this class. - */ - public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost - { - public VirtualHostMBean() throws NotCompliantMBeanException - { - super(ManagedVirtualHost.class, ManagedVirtualHost.TYPE); - } - - public String getObjectInstanceName() - { - return ObjectName.quote(_name); - } - - public String getName() - { - return _name; - } - - public VirtualHostImpl getVirtualHost() - { - return VirtualHostImpl.this; - } - } - - - /** * Initialise a housekeeping task to iterate over queues cleaning expired messages with no consumers * and checking for idle or open transactions that have exceeded the permitted thresholds. * * @param period */ - private void initialiseHouseKeeping(long period) + private void initialiseHouseKeeping(long period) { + if (period != 0L) { - - scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask()); Map<String, VirtualHostPluginFactory> plugins = _appRegistry.getPluginManager().getVirtualHostPlugins(); @@ -290,50 +249,30 @@ public class VirtualHostImpl implements VirtualHost } } - private class VirtualHostHouseKeepingTask extends HouseKeepingTask + private void shutdownHouseKeeping() { - public VirtualHostHouseKeepingTask() - { - super(VirtualHostImpl.this); - } + _houseKeepingTasks.shutdown(); - public void execute() + try + { + if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) + { + _houseKeepingTasks.shutdownNow(); + } + } + catch (InterruptedException e) + { + _logger.warn("Interrupted during Housekeeping shutdown:", e); + Thread.currentThread().interrupt(); + } + } + + private void removeHouseKeepingTasks() + { + BlockingQueue<Runnable> taskQueue = _houseKeepingTasks.getQueue(); + for (final Runnable runnable : taskQueue) { - for (AMQQueue q : _queueRegistry.getQueues()) - { - _logger.debug("Checking message status for queue: " - + q.getName()); - try - { - q.checkMessageStatus(); - } - catch (Exception e) - { - _logger.error("Exception in housekeeping for queue: " - + q.getNameShortString().toString(), e); - //Don't throw exceptions as this will stop the - // house keeping task from running. - } - } - for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) - { - _logger.debug("Checking for long running open transactions on connection " + connection); - for (AMQSessionModel session : connection.getSessionModels()) - { - _logger.debug("Checking for long running open transactions on session " + session); - try - { - session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(), - _configuration.getTransactionTimeoutOpenClose(), - _configuration.getTransactionTimeoutIdleWarn(), - _configuration.getTransactionTimeoutIdleClose()); - } - catch (Exception e) - { - _logger.error("Exception in housekeeping for connection: " + connection.toString(), e); - } - } - } + _houseKeepingTasks.remove(runnable); } } @@ -381,36 +320,43 @@ public class VirtualHostImpl implements VirtualHost } - private void initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception + private MessageStore initialiseMessageStore(final String messageStoreFactoryClass) throws Exception { - String messageStoreClass = hostConfig.getMessageStoreClass(); - - Class<?> clazz = Class.forName(messageStoreClass); - Object o = clazz.newInstance(); + final Class<?> clazz = Class.forName(messageStoreFactoryClass); + final Object o = clazz.newInstance(); - if (!(o instanceof MessageStore)) + if (!(o instanceof MessageStoreFactory)) { - throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz + - " does not."); + throw new ClassCastException("Message store factory class must implement " + MessageStoreFactory.class + + ". Class " + clazz + " does not."); } - MessageStore messageStore = (MessageStore) o; - VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this); - MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore); + final MessageStoreFactory messageStoreFactory = (MessageStoreFactory) o; + final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStoreFactory.getStoreClassName()); + final MessageStore messageStore = messageStoreFactory.createMessageStore(storeLogSubject); - messageStore.configureConfigStore(this.getName(), - recoveryHandler, - hostConfig.getStoreConfiguration(), - storeLogSubject); + messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE); + messageStore.addEventListener(new AfterActivationListener(), Event.AFTER_ACTIVATE); + messageStore.addEventListener(new BeforeCloseListener(), Event.BEFORE_CLOSE); + messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE); - messageStore.configureMessageStore(this.getName(), - recoveryHandler, - recoveryHandler, - hostConfig.getStoreConfiguration(), storeLogSubject); + return messageStore; + } - _messageStore = messageStore; + private void configureMessageStore(VirtualHostConfiguration hostConfig) throws Exception + { + VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this); + // TODO perhaps pass config on construction?? + _messageStore.configureConfigStore(getName(), recoveryHandler, hostConfig.getStoreConfiguration()); + _messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler, hostConfig.getStoreConfiguration()); + + } + + private void activateNonHAMessageStore() throws Exception + { + _messageStore.activate(); } private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException @@ -424,7 +370,7 @@ public class VirtualHostImpl implements VirtualHost configureExchange(config.getExchangeConfiguration(exchangeName)); } - String[] queueNames = config.getQueueNames(); + String[] queueNames = config.getQueueNames(); for (Object queueNameObj : queueNames) { @@ -435,16 +381,16 @@ public class VirtualHostImpl implements VirtualHost private void configureExchange(ExchangeConfiguration exchangeConfiguration) throws AMQException { - AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName()); + AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName()); Exchange exchange; exchange = _exchangeRegistry.getExchange(exchangeName); if (exchange == null) { - AMQShortString type = new AMQShortString(exchangeConfiguration.getType()); - boolean durable = exchangeConfiguration.getDurable(); - boolean autodelete = exchangeConfiguration.getAutoDelete(); + AMQShortString type = new AMQShortString(exchangeConfiguration.getType()); + boolean durable = exchangeConfiguration.getDurable(); + boolean autodelete = exchangeConfiguration.getAutoDelete(); Exchange newExchange = _exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0); _exchangeRegistry.registerExchange(newExchange); @@ -458,7 +404,7 @@ public class VirtualHostImpl implements VirtualHost private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this); + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this); String queueName = queue.getName(); if (queue.isDurable()) @@ -467,13 +413,13 @@ public class VirtualHostImpl implements VirtualHost } //get the exchange name (returns default exchange name if none was specified) - String exchangeName = queueConfiguration.getExchange(); + String exchangeName = queueConfiguration.getExchange(); Exchange exchange = _exchangeRegistry.getExchange(exchangeName); - if (exchange == null) - { + if (exchange == null) + { throw new ConfigurationException("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName); - } + } Exchange defaultExchange = _exchangeRegistry.getDefaultExchange(); @@ -561,39 +507,8 @@ public class VirtualHostImpl implements VirtualHost { //Stop Connections _connectionRegistry.close(); - - //Stop the Queues processing - if (_queueRegistry != null) - { - for (AMQQueue queue : _queueRegistry.getQueues()) - { - queue.stop(); - } - } - - //Stop Housekeeping - if (_houseKeepingTasks != null) - { - _houseKeepingTasks.shutdown(); - - try - { - if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) - { - _houseKeepingTasks.shutdownNow(); - } - } - catch (InterruptedException e) - { - _logger.warn("Interrupted during Housekeeping shutdown:" + e.getMessage()); - // Swallowing InterruptedException ok as we are shutting down. - } - } - - if(_dtxRegistry != null) - { - _dtxRegistry.close(); - } + _queueRegistry.stopAllAndUnregisterMBeans(); + _dtxRegistry.close(); //Close MessageStore if (_messageStore != null) @@ -609,6 +524,8 @@ public class VirtualHostImpl implements VirtualHost } } + _state = State.STOPPED; + CurrentActor.get().message(VirtualHostMessages.CLOSED()); } @@ -720,7 +637,6 @@ public class VirtualHostImpl implements VirtualHost return blink; } - public void createBrokerConnection(final String transport, final String host, final int port, @@ -767,105 +683,165 @@ public class VirtualHostImpl implements VirtualHost return _dtxRegistry; } - /** - * Temporary Startup RT class to record the creation of persistent queues / exchanges. - * - * - * This is so we can replay the creation of queues/exchanges in to the real _RT after it has been loaded. - * This should be removed after the _RT has been fully split from the the TL - */ - private static class StartupRoutingTable implements MessageStore + @Override + public String toString() { - public void configureConfigStore(String name, - ConfigurationRecoveryHandler recoveryHandler, - Configuration config, - LogSubject logSubject) throws Exception - { - } - - public void createExchange(Exchange exchange) throws AMQStoreException - { - } - - public void removeExchange(Exchange exchange) throws AMQStoreException - { - } - - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException - { - } - - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException - { - } - - public void createQueue(AMQQueue queue) throws AMQStoreException - { - } - - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException - { - } - - public void removeQueue(AMQQueue queue) throws AMQStoreException - { - } - - public void updateQueue(AMQQueue queue) throws AMQStoreException - { - } + return _name; + } - public void createBrokerLink(final BrokerLink link) throws AMQStoreException - { - } + @Override + public State getState() + { + return _state; + } - public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException - { - } - public void createBridge(final Bridge bridge) throws AMQStoreException + /** + * Virtual host JMX MBean class. + * + * This has some of the methods implemented from management interface for exchanges. Any + * Implementation of an Exchange MBean should extend this class. + */ + public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost + { + public VirtualHostMBean() throws NotCompliantMBeanException { + super(ManagedVirtualHost.class, ManagedVirtualHost.TYPE); } - public void deleteBridge(final Bridge bridge) throws AMQStoreException + public String getObjectInstanceName() { + return ObjectName.quote(_name); } - @Override - public void configureMessageStore(String name, - MessageStoreRecoveryHandler recoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config, LogSubject logSubject) throws Exception + public String getName() { + return _name; } - @Override - public void close() throws Exception + public VirtualHostImpl getVirtualHost() { + return VirtualHostImpl.this; } + } - @Override - public <T extends StorableMessageMetaData> StoredMessage<T> addMessage( - T metaData) - { - return null; - } + private final class BeforeActivationListener implements EventListener + { + @Override + public void event(Event event) + { + try + { + _exchangeRegistry.initialise(); + initialiseModel(_vhostConfig); + } + catch (Exception e) + { + throw new RuntimeException("Failed to initialise virtual host after state change", e); + } + } + } + + private final class AfterActivationListener implements EventListener + { + @Override + public void event(Event event) + { + initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod()); + try + { + _brokerMBean.register(); + } + catch (JMException e) + { + throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e); + } + + _state = State.ACTIVE; + } + } + + public class BeforePassivationListener implements EventListener + { @Override - public boolean isPersistent() + public void event(Event event) { - return false; - } + _connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); + _brokerMBean.unregister(); + removeHouseKeepingTasks(); - @Override - public Transaction newTransaction() - { - return null; - } - } + _queueRegistry.stopAllAndUnregisterMBeans(); + _exchangeRegistry.clearAndUnregisterMbeans(); + _dtxRegistry.close(); - @Override - public String toString() - { - return _name; - } + _state = State.PASSIVE; + } + } + + private final class BeforeCloseListener implements EventListener + { + @Override + public void event(Event event) + { + _brokerMBean.unregister(); + shutdownHouseKeeping(); + } + } + + private class VirtualHostHouseKeepingTask extends HouseKeepingTask + { + public VirtualHostHouseKeepingTask() + { + super(VirtualHostImpl.this); + } + + public void execute() + { + for (AMQQueue q : _queueRegistry.getQueues()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Checking message status for queue: " + + q.getName()); + } + try + { + q.checkMessageStatus(); + } + catch (Exception e) + { + _logger.error("Exception in housekeeping for queue: " + + q.getNameShortString().toString(), e); + //Don't throw exceptions as this will stop the + // house keeping task from running. + } + } + for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Checking for long running open transactions on connection " + connection); + } + for (AMQSessionModel session : connection.getSessionModels()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Checking for long running open transactions on session " + session); + } + try + { + session.checkTransactionStatus(_vhostConfig.getTransactionTimeoutOpenWarn(), + _vhostConfig.getTransactionTimeoutOpenClose(), + _vhostConfig.getTransactionTimeoutIdleWarn(), + _vhostConfig.getTransactionTimeoutIdleClose()); + } + catch (Exception e) + { + _logger.error("Exception in housekeeping for connection: " + connection.toString(), e); + } + } + } + } + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java index d34d1bbef3..5c500771c2 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory; import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -165,7 +166,7 @@ public class AMQBrokerManagerMBeanTest extends QpidTestCase XMLConfiguration configXml = new XMLConfiguration(); configXml.addProperty("virtualhosts.virtualhost(-1).name", "test"); - configXml.addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName()); + configXml.addProperty("virtualhosts.virtualhost(-1).test.store.factoryclass", TestableMemoryMessageStoreFactory.class.getName()); ServerConfiguration configuration = new ServerConfiguration(configXml); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java index 50e7f0588b..c4c93acfb6 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.queue.AMQPriorityQueue; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory; import org.apache.qpid.server.util.InternalBrokerBaseCase; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -161,7 +162,7 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "r2d2"); getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues.queue.r2d2.deadLetterQueues", "true"); getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "c3p0"); - getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.store.class", TestableMemoryMessageStore.class.getName()); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.store.factoryclass", TestableMemoryMessageStoreFactory.class.getName()); // Start the broker now. super.createBroker(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 488f251b0a..b6ee95a1cb 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -20,8 +20,18 @@ */ package org.apache.qpid.server.exchange; -import org.apache.log4j.Logger; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -31,7 +41,6 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.binding.BindingFactory; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageMetaData; @@ -44,23 +53,10 @@ import org.apache.qpid.server.queue.MockStoredMessage; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.util.InternalBrokerBaseCase; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase { private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class); @@ -68,24 +64,6 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase private final HeadersExchange exchange = new HeadersExchange(); protected final Set<TestQueue> queues = new HashSet<TestQueue>(); - - - /** - * Not used in this test, just there to stub out the routing calls - */ - private MemoryMessageStore _store = new MemoryMessageStore(); - - - private BindingFactory bindingFactory = new BindingFactory(new DurableConfigurationStore.Source() - { - - public DurableConfigurationStore getMessageStore() - { - return _store; - } - }, - exchange); - private int count; public void testDoNothing() @@ -103,7 +81,6 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase protected void unbind(TestQueue queue, String... bindings) throws AMQException { String queueName = queue.getName(); - //TODO - check this exchange.onUnbind(new Binding(null,queueName, queue, exchange, getHeadersMap(bindings))); } @@ -538,12 +515,6 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase } - public AMQMessage getUnderlyingMessage() - { - return Message.this; - } - - public ContentHeaderBody getContentHeader() { try diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java index cc032a0430..3377573b9d 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java @@ -29,12 +29,10 @@ public class MessageStoreMessagesTest extends AbstractTestMessages { public void testMessageStoreCreated() { - String name = "DerbyMessageStore"; - - _logMessage = MessageStoreMessages.CREATED(name); + _logMessage = MessageStoreMessages.CREATED(); List<Object> log = performLog(); - String[] expected = {"Created :", name}; + String[] expected = {"Created"}; validateLogMessage(log, "MST-1001", expected); } @@ -70,56 +68,4 @@ public class MessageStoreMessagesTest extends AbstractTestMessages validateLogMessage(log, "MST-1004", expected); } -/* - public void testMessageStoreRecoveryStart_withQueue() - { - String queueName = "testQueue"; - - _logMessage = MessageStoreMessages.RECOVERY_START(queueName, true); - List<Object> log = performLog(); - - String[] expected = {"Recovery Start :", queueName}; - - validateLogMessage(log, "MST-1004", expected); - } - - public void testMessageStoreRecovered() - { - String queueName = "testQueue"; - Integer messasgeCount = 2000; - - _logMessage = MessageStoreMessages.MST_RECOVERED(messasgeCount, queueName); - List<Object> log = performLog(); - - // Here we use MessageFormat to ensure the messasgeCount of 2000 is - // reformated for display as '2,000' - String[] expected = {"Recovered ", - MessageFormat.format("{0,number}", messasgeCount), - "messages for queue", queueName}; - - validateLogMessage(log, "MST-1005", expected); - } - - public void testMessageStoreRecoveryComplete() - { - _logMessage = MessageStoreMessages.MST_RECOVERY_COMPLETE(null,false); - List<Object> log = performLog(); - - String[] expected = {"Recovery Complete"}; - - validateLogMessage(log, "MST-1006", expected); - } - - public void testMessageStoreRecoveryComplete_withQueue() - { - String queueName = "testQueue"; - - _logMessage = MessageStoreMessages.MST_RECOVERY_COMPLETE(queueName, true); - List<Object> log = performLog(); - - String[] expected = {"Recovery Complete :", queueName}; - - validateLogMessage(log, "MST-1006", expected); - } - */ } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java index 158fb667a9..c62b24c3b9 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java @@ -37,13 +37,13 @@ public class MessageStoreLogSubjectTest extends AbstractTestLogSubject _testVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry(). getVirtualHost("test"); - _subject = new MessageStoreLogSubject(_testVhost, _testVhost.getMessageStore()); + _subject = new MessageStoreLogSubject(_testVhost, _testVhost.getMessageStore().getClass().getSimpleName()); } /** * Validate that the logged Subject message is as expected: * MESSAGE [Blank][vh(/test)/ms(MemoryMessageStore)] <Log Message> - * @param message the message whos format needs validation + * @param message the message who's format needs validation */ @Override protected void validateLogStatement(String message) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java index 71d5211470..fe9bcc57a6 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java @@ -30,7 +30,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.SkeletonMessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.util.InternalBrokerBaseCase; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -45,7 +45,7 @@ public class AMQProtocolSessionMBeanTest extends InternalBrokerBaseCase /** Used for debugging. */ private static final Logger log = Logger.getLogger(AMQProtocolSessionMBeanTest.class); - private MessageStore _messageStore = new SkeletonMessageStore(); + private MessageStore _messageStore = new TestableMemoryMessageStore(); private AMQProtocolEngine _protocolSession; private AMQChannel _channel; private AMQProtocolSessionMBean _mbean; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index 337ff194c3..2e3ff90df9 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory; import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -53,7 +54,7 @@ public class AMQQueueFactoryTest extends QpidTestCase XMLConfiguration configXml = new XMLConfiguration(); configXml.addProperty("virtualhosts.virtualhost(-1).name", getName()); - configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.class", TestableMemoryMessageStore.class.getName()); + configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.factoryclass", TestableMemoryMessageStoreFactory.class.getName()); ServerConfiguration configuration = new ServerConfiguration(configXml); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 45933e7064..d588cdd42c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -157,7 +157,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase private void verifyBrokerState() { - TestableMemoryMessageStore store = (TestableMemoryMessageStore) getVirtualHost().getMessageStore(); + TestableMemoryMessageStore store = (TestableMemoryMessageStore) getVirtualHost().getMessageStore().getUnderlyingStore(); // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up. diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 273f0dc018..409b9fd92e 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; @@ -37,7 +36,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.TestMemoryMessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -53,13 +52,11 @@ import java.util.Set; */ public class AckTest extends InternalBrokerBaseCase { - private static final Logger _log = Logger.getLogger(AckTest.class); - private Subscription _subscription; private AMQProtocolSession _protocolSession; - private TestMemoryMessageStore _messageStore; + private TestableMemoryMessageStore _messageStore; private AMQChannel _channel; @@ -73,7 +70,7 @@ public class AckTest extends InternalBrokerBaseCase { super.setUp(); _virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); - _messageStore = new TestMemoryMessageStore(); + _messageStore = new TestableMemoryMessageStore(); _protocolSession = new InternalTestProtocolSession(_virtualHost); _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/); @@ -180,7 +177,7 @@ public class AckTest extends InternalBrokerBaseCase } catch (InterruptedException e) { - e.printStackTrace(); //TODO. + Thread.currentThread().interrupt(); } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index a8676bf4c2..7c3098298e 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -42,6 +42,7 @@ import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -105,9 +106,9 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance(); PropertiesConfiguration env = new PropertiesConfiguration(); - VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(getClass().getName(), env); - vHostConfig.setMessageStoreClass(TestableMemoryMessageStore.class.getName()); - _virtualHost = new VirtualHostImpl(ApplicationRegistry.getInstance(), vHostConfig); + final VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration(getClass().getName(), env); + vhostConfig.setMessageStoreFactoryClass(TestableMemoryMessageStoreFactory.class.getName()); + _virtualHost = new VirtualHostImpl(ApplicationRegistry.getInstance(), vhostConfig); applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost); _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, false, _virtualHost, _arguments); @@ -635,7 +636,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase qs.add(_queue); MessageMetaData metaData = msg.headersReceived(System.currentTimeMillis()); - TestableMemoryMessageStore store = (TestableMemoryMessageStore) _virtualHost.getMessageStore(); + TestableMemoryMessageStore store = (TestableMemoryMessageStore) _virtualHost.getMessageStore().getUnderlyingStore(); StoredMessage handle = store.addMessage(metaData); msg.setStoredMessage(handle); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/EventManagerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/EventManagerTest.java new file mode 100644 index 0000000000..2be79c5839 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/EventManagerTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.apache.qpid.server.store.Event.AFTER_ACTIVATE; +import static org.apache.qpid.server.store.Event.BEFORE_ACTIVATE; +import junit.framework.TestCase; + +public class EventManagerTest extends TestCase +{ + private EventManager _eventManager = new EventManager(); + private EventListener _mockListener = mock(EventListener.class); + + public void testEventListenerFires() + { + _eventManager.addEventListener(_mockListener, BEFORE_ACTIVATE); + _eventManager.notifyEvent(BEFORE_ACTIVATE); + verify(_mockListener).event(BEFORE_ACTIVATE); + } + + public void testEventListenerDoesntFire() + { + _eventManager.addEventListener(_mockListener, BEFORE_ACTIVATE); + _eventManager.notifyEvent(AFTER_ACTIVATE); + verifyZeroInteractions(_mockListener); + } + + public void testEventListenerFiresMulitpleTimes() + { + _eventManager.addEventListener(_mockListener, BEFORE_ACTIVATE); + _eventManager.addEventListener(_mockListener, AFTER_ACTIVATE); + + _eventManager.notifyEvent(BEFORE_ACTIVATE); + verify(_mockListener).event(BEFORE_ACTIVATE); + + _eventManager.notifyEvent(AFTER_ACTIVATE); + verify(_mockListener).event(AFTER_ACTIVATE); + } + + public void testMultipleListenersFireForSameEvent() + { + final EventListener mockListener1 = mock(EventListener.class); + final EventListener mockListener2 = mock(EventListener.class); + + _eventManager.addEventListener(mockListener1, BEFORE_ACTIVATE); + _eventManager.addEventListener(mockListener2, BEFORE_ACTIVATE); + _eventManager.notifyEvent(BEFORE_ACTIVATE); + + verify(mockListener1).event(BEFORE_ACTIVATE); + verify(mockListener2).event(BEFORE_ACTIVATE); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 755d61a260..c589bd108b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -101,7 +101,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase String storePath = System.getProperty("QPID_WORK") + "/" + getName(); _config = new PropertiesConfiguration(); - _config.addProperty("store.class", getTestProfileMessageStoreClassName()); + _config.addProperty("store.factoryclass", getTestProfileMessageStoreFactoryClassName()); _config.addProperty("store.environment-path", storePath); cleanup(new File(storePath)); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java index 2ffa157ca8..4aa023a25c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java @@ -34,12 +34,12 @@ import org.apache.qpid.test.utils.QpidTestCase; */ public class ReferenceCountingTest extends QpidTestCase { - private TestMemoryMessageStore _store; + private TestableMemoryMessageStore _store; protected void setUp() throws Exception { - _store = new TestMemoryMessageStore(); + _store = new TestableMemoryMessageStore(); } /** diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java deleted file mode 100644 index 38d3fb78fc..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import org.apache.commons.configuration.Configuration; - -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.federation.Bridge; -import org.apache.qpid.server.federation.BrokerLink; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.message.EnqueableMessage; -import org.apache.qpid.server.queue.AMQQueue; - -/** - * A message store that does nothing. Designed to be used in tests that do not want to use any message store - * functionality. - */ -public class SkeletonMessageStore implements MessageStore -{ - public void configureConfigStore(String name, - ConfigurationRecoveryHandler recoveryHandler, - Configuration config, - LogSubject logSubject) throws Exception - { - } - - public void configureMessageStore(String name, - MessageStoreRecoveryHandler recoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler, - Configuration config, LogSubject logSubject) throws Exception - { - } - - public void close() throws Exception - { - } - - public <M extends StorableMessageMetaData> StoredMessage<M> addMessage(M metaData) - { - return null; - } - - - public void createExchange(Exchange exchange) throws AMQStoreException - { - - } - - public void removeExchange(Exchange exchange) throws AMQStoreException - { - - } - - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException - { - - } - - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException - { - - } - - public void createQueue(AMQQueue queue) throws AMQStoreException - { - } - - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException - { - } - - public boolean isPersistent() - { - return false; - } - - public void removeQueue(final AMQQueue queue) throws AMQStoreException - { - - } - - public Transaction newTransaction() - { - return new Transaction() - { - - public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException - { - - } - - public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException - { - - } - - public void commitTran() throws AMQStoreException - { - - } - - public StoreFuture commitTranAsync() throws AMQStoreException - { - return new StoreFuture() - { - public boolean isComplete() - { - return true; - } - - public void waitForCompletion() - { - - } - }; - } - - public void abortTran() throws AMQStoreException - { - - } - - public void removeXid(long format, byte[] globalId, byte[] branchId) - { - } - - public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) - { - } - }; - } - - public void updateQueue(AMQQueue queue) throws AMQStoreException - { - - } - - @Override - public void createBrokerLink(BrokerLink link) throws AMQStoreException - { - } - - @Override - public void deleteBrokerLink(BrokerLink link) throws AMQStoreException - { - } - - @Override - public void createBridge(Bridge bridge) throws AMQStoreException - { - } - - @Override - public void deleteBridge(Bridge bridge) throws AMQStoreException - { - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java new file mode 100644 index 0000000000..b09dcbbdf3 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java @@ -0,0 +1,89 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + + +import junit.framework.TestCase; + +public class StateManagerTest extends TestCase +{ + + private StateManager _manager = new StateManager(); + + public void testInitialState() + { + assertEquals(State.INITIAL, _manager.getState()); + } + + public void testStateTransitionAllowed() + { + assertEquals(State.INITIAL, _manager.getState()); + + _manager.stateTransition(State.INITIAL, State.ACTIVE); + assertEquals(State.ACTIVE, _manager.getState()); + } + + public void testStateTransitionDisallowed() + { + assertEquals(State.INITIAL, _manager.getState()); + + try + { + _manager.stateTransition(State.ACTIVE, State.CLOSING); + fail("Exception not thrown"); + } + catch (IllegalStateException e) + { + // PASS + } + assertEquals(State.INITIAL, _manager.getState()); + } + + public void testIsInState() + { + assertEquals(State.INITIAL, _manager.getState()); + assertFalse(_manager.isInState(State.ACTIVE)); + assertTrue(_manager.isInState(State.INITIAL)); + } + + public void testIsNotInState() + { + assertEquals(State.INITIAL, _manager.getState()); + assertTrue(_manager.isNotInState(State.ACTIVE)); + assertFalse(_manager.isNotInState(State.INITIAL)); + } + + public void testCheckInState() + { + assertEquals(State.INITIAL, _manager.getState()); + + try + { + _manager.checkInState(State.ACTIVE); + fail("Exception not thrown"); + } + catch (IllegalStateException e) + { + // PASS + } + assertEquals(State.INITIAL, _manager.getState()); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java deleted file mode 100644 index 8a261b3b86..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Adds some extra methods to the memory message store for testing purposes. - */ -public class TestMemoryMessageStore extends MemoryMessageStore -{ - private AtomicInteger _messageCount = new AtomicInteger(0); - - - public TestMemoryMessageStore() - { - } - - @Override - public StoredMessage addMessage(StorableMessageMetaData metaData) - { - return new TestableStoredMessage(super.addMessage(metaData)); - } - - public int getMessageCount() - { - return _messageCount.get(); - } - - private class TestableStoredMessage implements StoredMessage - { - private final StoredMessage _storedMessage; - - public TestableStoredMessage(StoredMessage storedMessage) - { - _messageCount.incrementAndGet(); - _storedMessage = storedMessage; - } - - public StorableMessageMetaData getMetaData() - { - return _storedMessage.getMetaData(); - } - - public long getMessageNumber() - { - return _storedMessage.getMessageNumber(); - } - - public void addContent(int offsetInMessage, ByteBuffer src) - { - _storedMessage.addContent(offsetInMessage, src); - } - - public int getContent(int offsetInMessage, ByteBuffer dst) - { - return _storedMessage.getContent(offsetInMessage, dst); - } - - - public ByteBuffer getContent(int offsetInMessage, int size) - { - return _storedMessage.getContent(offsetInMessage, size); - } - - public StoreFuture flushToStore() - { - return _storedMessage.flushToStore(); - } - - public void remove() - { - _storedMessage.remove(); - _messageCount.decrementAndGet(); - } - - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index 104e06d29a..210408f490 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.queue.AMQQueue; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; /** @@ -33,26 +34,8 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class TestableMemoryMessageStore extends MemoryMessageStore { - - private MemoryMessageStore _mms = null; - private HashMap<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>(); - private AtomicInteger _messageCount = new AtomicInteger(0); - - public TestableMemoryMessageStore(MemoryMessageStore mms) - { - _mms = mms; - } - - public TestableMemoryMessageStore() - { - - } - - @Override - public void close() throws Exception - { - // Not required to do anything - } + private final Map<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>(); + private final AtomicInteger _messageCount = new AtomicInteger(0); @Override public StoredMessage addMessage(StorableMessageMetaData metaData) @@ -65,36 +48,34 @@ public class TestableMemoryMessageStore extends MemoryMessageStore return _messageCount.get(); } + public Map<Long, AMQQueue> getMessages() + { + return _messages; + } + private class TestableTransaction implements Transaction { + @Override public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { getMessages().put(message.getMessageNumber(), (AMQQueue)queue); } + @Override public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { getMessages().remove(message.getMessageNumber()); } + @Override public void commitTran() throws AMQStoreException { } + @Override public StoreFuture commitTranAsync() throws AMQStoreException { - return new StoreFuture() - { - public boolean isComplete() - { - return true; - } - - public void waitForCompletion() - { - - } - }; + return StoreFuture.IMMEDIATE_FUTURE; } public void abortTran() throws AMQStoreException @@ -117,10 +98,6 @@ public class TestableMemoryMessageStore extends MemoryMessageStore return new TestableTransaction(); } - public HashMap<Long, AMQQueue> getMessages() - { - return _messages; - } private class TestableStoredMessage implements StoredMessage { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java new file mode 100644 index 0000000000..a737836ed5 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.store.decorators.EventDecorator; +import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator; + +public class TestableMemoryMessageStoreFactory implements MessageStoreFactory +{ + + @Override + public MessageStore createMessageStore(LogSubject logSubject) + { + return new OperationalLoggingDecorator(new EventDecorator(new TestableMemoryMessageStore()), logSubject); + } + + @Override + public String getStoreClassName() + { + return TestableMemoryMessageStore.class.getSimpleName(); + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/decorators/EventDecoratorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/decorators/EventDecoratorTest.java new file mode 100644 index 0000000000..7038b8710b --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/decorators/EventDecoratorTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.decorators; + +import static org.mockito.Mockito.*; + +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.decorators.EventDecorator; +import org.mockito.InOrder; + +import junit.framework.TestCase; + +public class EventDecoratorTest extends TestCase +{ + private MessageStore _mockStore = mock(MessageStore.class); + private EventListener _mockListener = mock(EventListener.class); + + private EventDecorator _eventDecorator = new EventDecorator(_mockStore); + private InOrder _orderMock = inOrder(_mockListener, _mockStore); + + public void testBeforeActivateDecoration() throws Exception + { + _eventDecorator.addEventListener(_mockListener, Event.BEFORE_ACTIVATE); + _eventDecorator.activate(); + + _orderMock.verify(_mockListener).event(Event.BEFORE_ACTIVATE); + _orderMock.verify(_mockStore).activate(); + } + + public void testAfterActivateDecoration() throws Exception + { + _eventDecorator.addEventListener(_mockListener, Event.AFTER_ACTIVATE); + _eventDecorator.activate(); + + _orderMock.verify(_mockStore).activate(); + _orderMock.verify(_mockListener).event(Event.AFTER_ACTIVATE); + } + + public void testBeforeAfterActivateDecoration() throws Exception + { + _eventDecorator.addEventListener(_mockListener, Event.BEFORE_ACTIVATE); + _eventDecorator.addEventListener(_mockListener, Event.AFTER_ACTIVATE); + _eventDecorator.activate(); + + _orderMock.verify(_mockListener).event(Event.BEFORE_ACTIVATE); + _orderMock.verify(_mockStore).activate(); + _orderMock.verify(_mockListener).event(Event.AFTER_ACTIVATE); + } + + public void testBeforeAfterCloseDecoration() throws Exception + { + _eventDecorator.addEventListener(_mockListener, Event.BEFORE_CLOSE); + _eventDecorator.addEventListener(_mockListener, Event.AFTER_CLOSE); + _eventDecorator.close(); + + _orderMock.verify(_mockListener).event(Event.BEFORE_CLOSE); + _orderMock.verify(_mockStore).close(); + _orderMock.verify(_mockListener).event(Event.AFTER_CLOSE); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecoratorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecoratorTest.java new file mode 100644 index 0000000000..cf06d3ab72 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecoratorTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.decorators; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import static org.apache.qpid.server.store.MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY; + +import junit.framework.TestCase; + +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler; +import org.apache.qpid.server.store.TransactionLogRecoveryHandler; +import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator; +import org.hamcrest.Description; +import org.mockito.ArgumentMatcher; +import org.mockito.InOrder; + +public class OperationalLoggingDecoratorTest extends TestCase +{ + private MessageStore _messageStore = mock(MessageStore.class); + private LogActor _mockActor = mock(LogActor.class); + private LogSubject _mockLogSubject = mock(LogSubject.class); + private OperationalLoggingDecorator _operationalLoggingDecorator = new OperationalLoggingDecorator(_messageStore, _mockLogSubject); + private InOrder _inOrder = inOrder(_mockActor, _messageStore); + + protected void setUp() throws Exception + { + super.setUp(); + CurrentActor.set(_mockActor); + } + + public void testConfigureMessageStore() throws Exception + { + _operationalLoggingDecorator.configureMessageStore(null,null,null,null); + + _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1001 : Created")); + _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("TXN-1001 : Created")); + _inOrder.verify(_messageStore).configureMessageStore(anyString(), any(MessageStoreRecoveryHandler.class), any(TransactionLogRecoveryHandler.class), any(Configuration.class)); + } + + public void testConfigureMessageStoreWithStoreLocation() throws Exception + { + final String storeLocation = "/my/store/location"; + Configuration mockConfig = mock(Configuration.class); + when(mockConfig.getString(ENVIRONMENT_PATH_PROPERTY)).thenReturn(storeLocation); + + _operationalLoggingDecorator.configureMessageStore(null,null,null, mockConfig); + + _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1001 : Created")); + _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("TXN-1001 : Created")); + _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1002 : Store location : " + storeLocation)); + _inOrder.verify(_messageStore).configureMessageStore(anyString(), any(MessageStoreRecoveryHandler.class), any(TransactionLogRecoveryHandler.class), any(Configuration.class)); + } + + public void testConfigureConfigStore() throws Exception + { + _operationalLoggingDecorator.configureConfigStore(null,null,null); + + _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("CFG-1001 : Created")); + _inOrder.verify(_messageStore).configureConfigStore(anyString(), any(ConfigurationRecoveryHandler.class), any(Configuration.class)); + } + + public void testActivate() throws Exception + { + _operationalLoggingDecorator.activate(); + + _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1004 : Recovery Start")); + _inOrder.verify(_messageStore).activate(); + _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1006 : Recovery Complete")); + } + + public void testClose() throws Exception + { + _operationalLoggingDecorator.close(); + + _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1003 : Closed")); + _inOrder.verify(_messageStore).close(); + } + + @Override + protected void tearDown() throws Exception + { + super.tearDown(); + CurrentActor.remove(); + } + + private LogMessage matchesLogMessage(String expectedLogMessage) + { + return argThat(new LogMessageArgumentMatcher(expectedLogMessage)); + } + + private final class LogMessageArgumentMatcher extends ArgumentMatcher<LogMessage> + { + private final String _expectedText; + private String _description = null; +; + public LogMessageArgumentMatcher(String _expectedLogMessage) + { + this._expectedText = _expectedLogMessage; + } + + @Override + public boolean matches(Object item) + { + LogMessage logMessage = (LogMessage) item; + final String actualText = logMessage.toString(); + if (actualText.equals(_expectedText)) + { + return true; + } + else + { + _description = "Expected <" + _expectedText + "> but got <" + actualText + ">"; + return false; + } + } + + @Override + public void describeTo(Description description) + { + if (description != null) + { + description.appendText(" : "+ _description); + } + } + } +}
\ No newline at end of file diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java index e9b7ceacc5..af49238998 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java @@ -20,26 +20,13 @@ */ package org.apache.qpid.server.txn; -import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.NotImplementedException; - import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.federation.Bridge; -import org.apache.qpid.server.federation.BrokerLink; -import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.EnqueableMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler; -import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.NullMessageStore; import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.Transaction; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; /** @@ -129,111 +116,14 @@ class MockStoreTransaction implements Transaction public static MessageStore createTestTransactionLog(final MockStoreTransaction storeTransaction) { - return new MessageStore() + return new NullMessageStore() { - public void configureMessageStore(final String name, - final MessageStoreRecoveryHandler recoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler, - final Configuration config, final LogSubject logSubject) throws Exception - { - } - - public void close() throws Exception - { - } - - public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData) - { - return null; - } - - public boolean isPersistent() - { - return false; - } - + @Override public Transaction newTransaction() { storeTransaction.setState(TransactionState.STARTED); return storeTransaction; } - - @Override - public void configureConfigStore(String name, - ConfigurationRecoveryHandler recoveryHandler, - Configuration config, LogSubject logSubject) - throws Exception - { - } - - @Override - public void createExchange(Exchange exchange) - throws AMQStoreException - { - } - - @Override - public void removeExchange(Exchange exchange) - throws AMQStoreException - { - } - - @Override - public void bindQueue(Exchange exchange, AMQShortString routingKey, - AMQQueue queue, FieldTable args) throws AMQStoreException - { - } - - @Override - public void unbindQueue(Exchange exchange, - AMQShortString routingKey, AMQQueue queue, FieldTable args) - throws AMQStoreException - { - } - - @Override - public void createQueue(AMQQueue queue) throws AMQStoreException - { - } - - @Override - public void createQueue(AMQQueue queue, FieldTable arguments) - throws AMQStoreException - { - } - - @Override - public void removeQueue(AMQQueue queue) throws AMQStoreException - { - } - - @Override - public void updateQueue(AMQQueue queue) throws AMQStoreException - { - } - - @Override - public void createBrokerLink(BrokerLink link) - throws AMQStoreException - { - } - - @Override - public void deleteBrokerLink(BrokerLink link) - throws AMQStoreException - { - } - - @Override - public void createBridge(Bridge bridge) throws AMQStoreException - { - } - - @Override - public void deleteBridge(Bridge bridge) throws AMQStoreException - { - } - - }; + }; } }
\ No newline at end of file diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index 9df0aec545..6b48d55fae 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -65,10 +66,10 @@ public class InternalBrokerBaseCase extends QpidTestCase super.setUp(); _configXml.addProperty("virtualhosts.virtualhost.name", "test"); - _configXml.addProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName()); + _configXml.addProperty("virtualhosts.virtualhost.test.store.factoryclass", TestableMemoryMessageStoreFactory.class.getName()); _configXml.addProperty("virtualhosts.virtualhost(-1).name", getName()); - _configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.class", TestableMemoryMessageStore.class.getName()); + _configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.factoryclass", TestableMemoryMessageStoreFactory.class.getName()); createBroker(); } @@ -97,7 +98,7 @@ public class InternalBrokerBaseCase extends QpidTestCase _virtualHost.getBindingFactory().addBinding(QUEUE_NAME.toString(), _queue, defaultExchange, null); _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test"); - _messageStore = _virtualHost.getMessageStore(); + _messageStore = _virtualHost.getMessageStore().getUnderlyingStore(); _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(getName()), false, new AMQShortString("testowner"), false, false, _virtualHost, null); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index cccf02c9f3..f27dc33dc3 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -275,4 +275,10 @@ public class MockVirtualHost implements VirtualHost { } + + @Override + public State getState() + { + return State.ACTIVE; + } }
\ No newline at end of file diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java index df7b4da426..87eb0f9d16 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java @@ -27,7 +27,7 @@ import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.MemoryMessageStoreFactory; import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.qpid.test.utils.QpidTestCase; @@ -68,31 +68,6 @@ public class VirtualHostImplTest extends QpidTestCase customBindingTestImpl(new String[0]); } - private void customBindingTestImpl(final String[] routingKeys) throws Exception - { - String exchangeName = getName() +".direct"; - String vhostName = getName(); - String queueName = getName(); - - File config = writeConfigFile(vhostName, queueName, exchangeName, false, routingKeys); - VirtualHost vhost = createVirtualHost(vhostName, config); - assertNotNull("virtualhost should exist", vhost); - - AMQQueue queue = vhost.getQueueRegistry().getQueue(queueName); - assertNotNull("queue should exist", queue); - - Exchange defaultExch = vhost.getExchangeRegistry().getDefaultExchange(); - assertTrue("queue should have been bound to default exchange with its name", defaultExch.isBound(queueName, queue)); - - Exchange exch = vhost.getExchangeRegistry().getExchange(exchangeName); - assertTrue("queue should have been bound to " + exchangeName + " with its name", exch.isBound(queueName, queue)); - - for(String key: routingKeys) - { - assertTrue("queue should have been bound to " + exchangeName + " with key " + key, exch.isBound(key, queue)); - } - } - /** * Tests that specifying custom routing keys for a queue in the configuration file results in failure * to create the vhost (since this is illegal, only queue names are used with the default exchange) @@ -106,12 +81,32 @@ public class VirtualHostImplTest extends QpidTestCase createVirtualHost(getName(), config); fail("virtualhost creation should have failed due to illegal configuration"); } - catch (ConfigurationException e) + catch (RuntimeException e) { + assertEquals(ConfigurationException.class, e.getCause().getClass()); //expected } } + public void testVirtualHostBecomesActive() throws Exception + { + File config = writeConfigFile(getName(), getName(), getName() +".direct", false, new String[0]); + VirtualHost vhost = createVirtualHost(getName(), config); + assertNotNull(vhost); + assertEquals(State.ACTIVE, vhost.getState()); + } + + public void testVirtualHostBecomesStoppedOnClose() throws Exception + { + File config = writeConfigFile(getName(), getName(), getName() +".direct", false, new String[0]); + VirtualHost vhost = createVirtualHost(getName(), config); + assertNotNull(vhost); + assertEquals(State.ACTIVE, vhost.getState()); + vhost.close(); + assertEquals(State.STOPPED, vhost.getState()); + assertEquals(0, vhost.getHouseKeepingActiveCount()); + } + /** * Tests that specifying an unknown exchange to bind the queue to results in failure to create the vhost */ @@ -124,12 +119,39 @@ public class VirtualHostImplTest extends QpidTestCase createVirtualHost(getName(), config); fail("virtualhost creation should have failed due to illegal configuration"); } - catch (ConfigurationException e) + catch (RuntimeException e) { + assertEquals(ConfigurationException.class, e.getCause().getClass()); //expected } } + private void customBindingTestImpl(final String[] routingKeys) throws Exception + { + String exchangeName = getName() +".direct"; + String vhostName = getName(); + String queueName = getName(); + + File config = writeConfigFile(vhostName, queueName, exchangeName, false, routingKeys); + VirtualHost vhost = createVirtualHost(vhostName, config); + assertNotNull("virtualhost should exist", vhost); + + AMQQueue queue = vhost.getQueueRegistry().getQueue(queueName); + assertNotNull("queue should exist", queue); + + Exchange defaultExch = vhost.getExchangeRegistry().getDefaultExchange(); + assertTrue("queue should have been bound to default exchange with its name", defaultExch.isBound(queueName, queue)); + + Exchange exch = vhost.getExchangeRegistry().getExchange(exchangeName); + assertTrue("queue should have been bound to " + exchangeName + " with its name", exch.isBound(queueName, queue)); + + for(String key: routingKeys) + { + assertTrue("queue should have been bound to " + exchangeName + " with key " + key, exch.isBound(key, queue)); + } + } + + private VirtualHost createVirtualHost(String vhostName, File config) throws Exception { _configuration = new ServerConfiguration(new XMLConfiguration(config)); @@ -167,11 +189,11 @@ public class VirtualHostImplTest extends QpidTestCase writer.write("<virtualhosts>"); writer.write(" <default>" + vhostName + "</default>"); writer.write(" <virtualhost>"); - writer.write(" <store>"); - writer.write(" <class>" + TestableMemoryMessageStore.class.getName() + "</class>"); - writer.write(" </store>"); writer.write(" <name>" + vhostName + "</name>"); writer.write(" <" + vhostName + ">"); + writer.write(" <store>"); + writer.write(" <factoryclass>" + MemoryMessageStoreFactory.class.getName() + "</factoryclass>"); + writer.write(" </store>"); if(exchangeName != null && !dontDeclare) { writer.write(" <exchanges>"); |
