summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-03-30 13:44:25 +0000
committerKeith Wall <kwall@apache.org>2012-03-30 13:44:25 +0000
commit38d1f36fe4238a887f867350adaa56489e53e0e6 (patch)
tree1a5504424a30e6fce56e89123c6036bed002d05b /qpid/java/broker
parentda8070494a06d0b6c37127eb0a3439e394bddd31 (diff)
downloadqpid-python-38d1f36fe4238a887f867350adaa56489e53e0e6.tar.gz
QPID-3917: Refactor VirtualHost/MessageStore implementations to be ready for BDB-HA
Applied patch from Andrew MacBean <andymacbean@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1307416 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/etc/virtualhosts.xml12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java59
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java23
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java29
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java23
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java2
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java30
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventListener.java25
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java48
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java164
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java (renamed from qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java)22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java27
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java29
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java145
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java31
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java60
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java188
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java58
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java86
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java (renamed from qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java)301
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java43
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java29
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java2
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java28
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java510
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java51
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java58
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java11
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java9
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/EventManagerTest.java72
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java179
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java89
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java98
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java49
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java41
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/decorators/EventDecoratorTest.java79
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecoratorTest.java157
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java118
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java7
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java84
69 files changed, 1938 insertions, 1296 deletions
diff --git a/qpid/java/broker/etc/virtualhosts.xml b/qpid/java/broker/etc/virtualhosts.xml
index 4dcdcda6d2..1f7f91de9a 100644
--- a/qpid/java/broker/etc/virtualhosts.xml
+++ b/qpid/java/broker/etc/virtualhosts.xml
@@ -25,8 +25,8 @@
<name>localhost</name>
<localhost>
<store>
- <class>org.apache.qpid.server.store.MemoryMessageStore</class>
- <!--<class>org.apache.qpid.server.store.DerbyMessageStore</class>
+ <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass>
+ <!--<factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass>
<environment-path>${QPID_WORK}/derbystore</environment-path>-->
</store>
@@ -86,8 +86,8 @@
<name>development</name>
<development>
<store>
- <class>org.apache.qpid.server.store.MemoryMessageStore</class>
- <!--<class>org.apache.qpid.server.store.DerbyMessageStore</class>
+ <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass>
+ <!--<factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass>
<environment-path>${QPID_WORK}/derbystore</environment-path>-->
</store>
@@ -125,8 +125,8 @@
<name>test</name>
<test>
<store>
- <class>org.apache.qpid.server.store.MemoryMessageStore</class>
- <!--<class>org.apache.qpid.server.store.DerbyMessageStore</class>
+ <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass>
+ <!--<factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass>
<environment-path>${QPID_WORK}/derbystore</environment-path>-->
</store>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index b388def86c..7ef06ce0f8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -37,7 +37,6 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQQueueMBean;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -60,8 +59,6 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
private final QueueRegistry _queueRegistry;
private final ExchangeRegistry _exchangeRegistry;
private final ExchangeFactory _exchangeFactory;
- private final Exchange _defaultExchange;
- private final DurableConfigurationStore _durableConfig;
private final VirtualHostImpl.VirtualHostMBean _virtualHostMBean;
@@ -75,8 +72,6 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
_queueRegistry = virtualHost.getQueueRegistry();
_exchangeRegistry = virtualHost.getExchangeRegistry();
- _defaultExchange = _exchangeRegistry.getDefaultExchange();
- _durableConfig = virtualHost.getMessageStore();
_exchangeFactory = virtualHost.getExchangeFactory();
}
@@ -181,7 +176,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
_exchangeRegistry.registerExchange(exchange);
if (durable)
{
- _durableConfig.createExchange(exchange);
+ getVirtualHost().getMessageStore().createExchange(exchange);
}
}
else
@@ -275,10 +270,10 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
false, false, getVirtualHost(), args);
if (queue.isDurable() && !queue.isAutoDelete())
{
- _durableConfig.createQueue(queue, args);
+ getVirtualHost().getMessageStore().createQueue(queue, args);
}
- virtualHost.getBindingFactory().addBinding(queueName, queue, _defaultExchange, null);
+ virtualHost.getBindingFactory().addBinding(queueName, queue, _exchangeRegistry.getDefaultExchange(), null);
}
catch (AMQException ex)
{
@@ -317,7 +312,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
queue.delete();
if (queue.isDurable())
{
- _durableConfig.removeQueue(queue);
+ getVirtualHost().getMessageStore().removeQueue(queue);
}
}
catch (AMQException ex)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
index 250c417ef1..2460be4705 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
@@ -34,7 +34,6 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.BindingMessages;
import org.apache.qpid.server.logging.subjects.BindingLogSubject;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collections;
@@ -44,38 +43,14 @@ import java.util.concurrent.ConcurrentHashMap;
public class BindingFactory
{
private final VirtualHost _virtualHost;
- private final DurableConfigurationStore.Source _configSource;
- private final Exchange _defaultExchange;
private final ConcurrentHashMap<BindingImpl, BindingImpl> _bindings = new ConcurrentHashMap<BindingImpl, BindingImpl>();
-
public BindingFactory(final VirtualHost vhost)
{
- this(vhost, vhost.getExchangeRegistry().getDefaultExchange());
- }
-
- public BindingFactory(final DurableConfigurationStore.Source configSource, final Exchange defaultExchange)
- {
- _configSource = configSource;
- _defaultExchange = defaultExchange;
- if (configSource instanceof VirtualHost)
- {
- _virtualHost = (VirtualHost) configSource;
- }
- else
- {
- _virtualHost = null;
- }
- }
-
- public VirtualHost getVirtualHost()
- {
- return _virtualHost;
+ _virtualHost = vhost;
}
-
-
private final class BindingImpl extends Binding implements AMQQueue.Task, Exchange.Task, BindingConfig
{
private final BindingLogSubject _logSubject;
@@ -156,30 +131,38 @@ public class BindingFactory
private boolean makeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force) throws AMQSecurityException, AMQInternalException
{
assert queue != null;
+ final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
+
if (bindingKey == null)
{
bindingKey = "";
}
if (exchange == null)
{
- exchange = _defaultExchange;
+ exchange = defaultExchange;
}
if (arguments == null)
{
arguments = Collections.emptyMap();
}
+ if (exchange == null)
+ {
+ throw new IllegalArgumentException("exchange cannot be null");
+ }
+
// The default exchange bindings must reflect the existence of queues, allow
// all operations on it to succeed. It is up to the broker to prevent illegal
// attempts at binding to this exchange, not the ACLs.
- if(exchange != _defaultExchange)
+ if(exchange != defaultExchange)
{
//Perform ACLs
- if (!getVirtualHost().getSecurityManager().authoriseBind(exchange, queue, new AMQShortString(bindingKey)))
+ if (!_virtualHost.getSecurityManager().authoriseBind(exchange, queue, new AMQShortString(bindingKey)))
{
throw new AMQSecurityException("Permission denied: binding " + bindingKey);
}
}
+
BindingImpl b = new BindingImpl(bindingKey,queue,exchange,arguments);
BindingImpl existingMapping = _bindings.putIfAbsent(b,b);
@@ -192,7 +175,7 @@ public class BindingFactory
if (b.isDurable() && !restore)
{
- _configSource.getMessageStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments));
+ _virtualHost.getMessageStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments));
}
queue.addQueueDeleteTask(b);
@@ -212,7 +195,7 @@ public class BindingFactory
private ConfigStore getConfigStore()
{
- return getVirtualHost().getConfigStore();
+ return _virtualHost.getConfigStore();
}
public void restoreBinding(final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException, AMQInternalException
@@ -229,13 +212,15 @@ public class BindingFactory
public Binding removeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException
{
assert queue != null;
+ final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
+
if (bindingKey == null)
{
bindingKey = "";
}
if (exchange == null)
{
- exchange = _defaultExchange;
+ exchange = defaultExchange;
}
if (arguments == null)
{
@@ -245,10 +230,10 @@ public class BindingFactory
// The default exchange bindings must reflect the existence of queues, allow
// all operations on it to succeed. It is up to the broker to prevent illegal
// attempts at binding to this exchange, not the ACLs.
- if(exchange != _defaultExchange)
+ if(exchange != defaultExchange)
{
// Check access
- if (!getVirtualHost().getSecurityManager().authoriseUnbind(exchange, new AMQShortString(bindingKey), queue))
+ if (!_virtualHost.getSecurityManager().authoriseUnbind(exchange, new AMQShortString(bindingKey), queue))
{
throw new AMQSecurityException("Permission denied: unbinding " + bindingKey);
}
@@ -265,7 +250,7 @@ public class BindingFactory
if (b.isDurable())
{
- _configSource.getMessageStore().unbindQueue(exchange,
+ _virtualHost.getMessageStore().unbindQueue(exchange,
new AMQShortString(bindingKey),
queue,
FieldTable.convertToFieldTable(arguments));
@@ -280,13 +265,15 @@ public class BindingFactory
public Binding getBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments)
{
assert queue != null;
+ final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
+
if(bindingKey == null)
{
bindingKey = "";
}
if(exchange == null)
{
- exchange = _defaultExchange;
+ exchange = defaultExchange;
}
if(arguments == null)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 558311fc46..5f472b6ddd 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStoreFactory;
import java.util.ArrayList;
import java.util.Arrays;
@@ -102,14 +103,14 @@ public class VirtualHostConfiguration extends ConfigurationPlugin
return getConfig().subset("store");
}
- public String getMessageStoreClass()
+ public String getMessageStoreFactoryClass()
{
- return getStringValue("store.class", MemoryMessageStore.class.getName());
+ return getStringValue("store.factoryclass", MemoryMessageStoreFactory.class.getName());
}
- public void setMessageStoreClass(String storeClass)
+ public void setMessageStoreFactoryClass(String storeFactoryClass)
{
- getConfig().setProperty("store.class", storeClass);
+ getConfig().setProperty("store.factoryclass", storeFactoryClass);
}
public List getExchanges()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index 9159489299..4a58314f51 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -46,11 +46,19 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable
/** Close all of the currently open connections. */
public void close()
{
- _logger.debug("Closing connection registry :" + _registry.size() + " connections.");
+ close(IConnectionRegistry.BROKER_SHUTDOWN_REPLY_TEXT);
+ }
+
+ public void close(final String replyText)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Closing connection registry :" + _registry.size() + " connections.");
+ }
while (!_registry.isEmpty())
{
AMQConnectionModel connection = _registry.get(0);
- closeConnection(connection, AMQConstant.CONNECTION_FORCED, "Broker is shutting down");
+ closeConnection(connection, AMQConstant.CONNECTION_FORCED, replyText);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
index 89582e5748..954c448b72 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
@@ -28,12 +28,17 @@ import java.util.List;
public interface IConnectionRegistry
{
+ public static final String BROKER_SHUTDOWN_REPLY_TEXT = "Broker is shutting down";
+ public static final String VHOST_PASSIVATE_REPLY_TEXT = "Virtual host is being passivated";
+
public void initialise();
public void close() throws AMQException;
-
+
+ public void close(String replyText) throws AMQException;
+
public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message);
-
+
public List<AMQConnectionModel> getConnections();
public void registerConnection(AMQConnectionModel connnection);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index cae07046fa..af49168a80 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.exchange;
-import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.binding.Binding;
@@ -125,10 +123,18 @@ public abstract class AbstractExchange implements Exchange, Managable
_autoDelete = autoDelete;
_ticket = ticket;
- // TODO - fix
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
+ createAndRegisterMBean();
+ _logSubject = new ExchangeLogSubject(this, this.getVirtualHost());
+
+ // Log Exchange creation
+ CurrentActor.get().message(ExchangeMessages.CREATED(String.valueOf(getTypeShortString()), String.valueOf(name), durable));
+ }
+
+ private void createAndRegisterMBean()
+ {
try
{
_exchangeMbean = createMBean();
@@ -136,12 +142,8 @@ public abstract class AbstractExchange implements Exchange, Managable
}
catch (JMException e)
{
- getLogger().error(e);
+ throw new RuntimeException("Failed to register mbean",e);
}
- _logSubject = new ExchangeLogSubject(this, this.getVirtualHost());
-
- // Log Exchange creation
- CurrentActor.get().message(ExchangeMessages.CREATED(String.valueOf(getTypeShortString()), String.valueOf(name), durable));
}
public ConfigStore getConfigStore()
@@ -149,8 +151,6 @@ public abstract class AbstractExchange implements Exchange, Managable
return getVirtualHost().getConfigStore();
}
- public abstract Logger getLogger();
-
public boolean isDurable()
{
return _durable;
@@ -324,8 +324,7 @@ public abstract class AbstractExchange implements Exchange, Managable
public Map<String, Object> getArguments()
{
- // TODO - Fix
- return Collections.EMPTY_MAP;
+ return Collections.emptyMap();
}
public UUID getId()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index ebe0645bc4..33e73b4668 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -36,7 +35,7 @@ import java.util.concurrent.ConcurrentMap;
public class DefaultExchangeRegistry implements ExchangeRegistry
{
- private static final Logger _log = Logger.getLogger(DefaultExchangeRegistry.class);
+ private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class);
/**
* Maps from exchange name to exchange instance
@@ -59,8 +58,6 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this, getDurableConfigurationStore());
}
-
-
public DurableConfigurationStore getDurableConfigurationStore()
{
return _host.getMessageStore();
@@ -153,4 +150,28 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
}
}
+ @Override
+ public void clearAndUnregisterMbeans()
+ {
+ for (final AMQShortString exchangeName : getExchangeNames())
+ {
+ final Exchange exchange = getExchange(exchangeName);
+
+ if (exchange instanceof AbstractExchange)
+ {
+ AbstractExchange abstractExchange = (AbstractExchange) exchange;
+ try
+ {
+ abstractExchange.getManagedObject().unregister();
+ }
+ catch (AMQException e)
+ {
+ LOGGER.warn("Failed to unregister mbean", e);
+ }
+ }
+ }
+ _exchangeMap.clear();
+ _exchangeMapStr.clear();
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
index 8c0a5001db..9525324f57 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
@@ -131,12 +131,6 @@ public class DirectExchange extends AbstractExchange
return new DirectExchangeMBean(this);
}
- public Logger getLogger()
- {
- return _logger;
- }
-
-
public List<? extends BaseQueue> doRoute(InboundMessage payload)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
index 4dfcce7bbe..335efaeaa2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
@@ -27,7 +27,8 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
public class ExchangeInitialiser
{
- public void initialise(ExchangeFactory factory, ExchangeRegistry registry, DurableConfigurationStore store) throws AMQException{
+ public void initialise(ExchangeFactory factory, ExchangeRegistry registry, DurableConfigurationStore store) throws AMQException
+ {
for (ExchangeType<? extends Exchange> type : factory.getRegisteredTypes())
{
define (registry, factory, type.getDefaultExchangeName(), type.getName(), store);
@@ -44,7 +45,6 @@ public class ExchangeInitialiser
{
Exchange exchange = f.createExchange(name, type, true, false, 0);
r.registerExchange(exchange);
-
if(exchange.isDurable())
{
store.createExchange(exchange);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
index 18eb37e037..db244c114b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
@@ -51,5 +51,7 @@ public interface ExchangeRegistry
Exchange getExchange(String exchangeName);
- void unregisterExchange(String exchange, boolean ifUnused) throws ExchangeInUseException, AMQException;;
+ void unregisterExchange(String exchange, boolean ifUnused) throws ExchangeInUseException, AMQException;
+
+ void clearAndUnregisterMbeans();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index 76f86ea1b4..f9ad2fad87 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -52,11 +52,6 @@ public class FanoutExchange extends AbstractExchange
return new FanoutExchangeMBean(this);
}
- public Logger getLogger()
- {
- return _logger;
- }
-
public static final ExchangeType<FanoutExchange> TYPE = new ExchangeType<FanoutExchange>()
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 295a7191e7..2700a7cda3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -231,11 +231,6 @@ public class HeadersExchange extends AbstractExchange
return new HeadersExchangeMBean(this);
}
- public Logger getLogger()
- {
- return _logger;
- }
-
protected void onBind(final Binding binding)
{
String bindingKey = binding.getBindingKey();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index 27166e4384..7ce84b7a89 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -407,11 +407,6 @@ public class TopicExchange extends AbstractExchange
return new TopicExchangeMBean(this);
}
- public Logger getLogger()
- {
- return _logger;
- }
-
private Collection<AMQQueue> getMatchedQueues(InboundMessage message, AMQShortString routingKey)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
index 79fcfb6d76..eab28ac9d4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.State;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody>
@@ -82,6 +83,10 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied: '" + virtualHost.getName() + "'");
}
+ else if (virtualHost.getState() != State.ACTIVE)
+ {
+ throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, "Virtual host '" + virtualHost.getName() + "' is not active");
+ }
session.setVirtualHost(virtualHost);
@@ -89,10 +94,10 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
if (session.getContextKey() == null)
{
session.setContextKey(generateClientID());
- }
+ }
MethodRegistry methodRegistry = session.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
+ AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
stateManager.changeState(AMQState.CONNECTION_OPEN);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties
index 3bc5074777..541f8b8c68 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties
@@ -18,8 +18,7 @@
#
# Default File used for all non-defined locales.
-# 0 - name
-CREATED = CFG-1001 : Created : {0}
+CREATED = CFG-1001 : Created
# 0 - path
STORE_LOCATION = CFG-1002 : Store location : {0}
CLOSE = CFG-1003 : Closed
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties
index a2cedeb22a..081f2bbca3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties
@@ -18,11 +18,11 @@
#
# Default File used for all non-defined locales.
#
-# 0 - name
-CREATED = MST-1001 : Created : {0}
+CREATED = MST-1001 : Created
# 0 - path
STORE_LOCATION = MST-1002 : Store location : {0}
CLOSED = MST-1003 : Closed
RECOVERY_START = MST-1004 : Recovery Start
RECOVERED = MST-1005 : Recovered {0,number} messages
-RECOVERY_COMPLETE = MST-1006 : Recovery Complete \ No newline at end of file
+RECOVERY_COMPLETE = MST-1006 : Recovery Complete
+PASSIVATE = MST-1007 : Store Passivated
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties
index 9ef58df940..b9e87159a6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties
@@ -19,8 +19,7 @@
# Default File used for all non-defined locales.
#
#
-# 0 - name
-CREATED = TXN-1001 : Created : {0}
+CREATED = TXN-1001 : Created
# 0 - path
STORE_LOCATION = TXN-1002 : Store location : {0}
CLOSED = TXN-1003 : Closed
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java
index 8f0b9182a9..6f18cbcc6b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java
@@ -40,7 +40,8 @@ public class BindingLogSubject extends AbstractLogSubject
public BindingLogSubject(String routingKey, Exchange exchange,
AMQQueue queue)
{
- setLogStringWithFormat(BINDING_FORMAT, queue.getVirtualHost().getName(),
+ setLogStringWithFormat(BINDING_FORMAT,
+ queue.getVirtualHost().getName(),
exchange.getTypeShortString(),
exchange.getNameShortString(),
queue.getNameShortString(),
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java
index 969288be00..08963bd8f1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.logging.subjects;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.STORE_FORMAT;
@@ -28,10 +27,9 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.STORE_FOR
public class MessageStoreLogSubject extends AbstractLogSubject
{
- /** Create an ExchangeLogSubject that Logs in the following format. */
- public MessageStoreLogSubject(VirtualHost vhost, MessageStore store)
+ /** Create an MessageStoreLogSubject that Logs in the following format. */
+ public MessageStoreLogSubject(VirtualHost vhost, String messageStoreName)
{
- setLogStringWithFormat(STORE_FORMAT, vhost.getName(),
- store.getClass().getSimpleName());
+ setLogStringWithFormat(STORE_FORMAT, vhost.getName(), messageStoreName);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
index d76487073d..801fe55939 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
@@ -20,7 +20,10 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -29,6 +32,8 @@ import java.util.concurrent.ConcurrentMap;
public class DefaultQueueRegistry implements QueueRegistry
{
+ private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class);
+
private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
private final VirtualHost _virtualHost;
@@ -72,4 +77,22 @@ public class DefaultQueueRegistry implements QueueRegistry
{
return getQueue(new AMQShortString(queue));
}
+
+ @Override
+ public void stopAllAndUnregisterMBeans()
+ {
+ for (final AMQQueue queue : getQueues())
+ {
+ queue.stop();
+ try
+ {
+ queue.getManagedObject().unregister();
+ }
+ catch (AMQException e)
+ {
+ LOGGER.warn("Failed to unregister mbean", e);
+ }
+ }
+ _queueMap.clear();
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
index 80f6bd1493..1ffc0a3560 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
@@ -40,4 +40,6 @@ public interface QueueRegistry
Collection<AMQQueue> getQueues();
AMQQueue getQueue(String queue);
+
+ void stopAllAndUnregisterMBeans();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index ea9621ff41..13c24e624e 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -28,7 +28,6 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.federation.Bridge;
import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.queue.AMQQueue;
public interface DurableConfigurationStore
@@ -46,13 +45,11 @@ public interface DurableConfigurationStore
* @param name The name to be used by this store
* @param recoveryHandler Handler to be called as the store recovers on start up
* @param config The apache commons configuration object.
- *
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
void configureConfigStore(String name,
ConfigurationRecoveryHandler recoveryHandler,
- Configuration config,
- LogSubject logSubject) throws Exception;
+ Configuration config) throws Exception;
/**
* Makes the specified exchange persistent.
*
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java
new file mode 100644
index 0000000000..95b0186027
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+public enum Event
+{
+ BEFORE_ACTIVATE,
+ AFTER_ACTIVATE,
+ BEFORE_PASSIVATE,
+ AFTER_PASSIVATE,
+ BEFORE_CLOSE,
+ AFTER_CLOSE
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventListener.java
new file mode 100644
index 0000000000..33ae7b5b24
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+public interface EventListener
+{
+ public void event(Event event);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java
new file mode 100644
index 0000000000..3e10f758f9
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class EventManager
+{
+ private ConcurrentMap<Event, List<EventListener>> _listeners = new ConcurrentHashMap<Event, List<EventListener>>();
+
+ public void addEventListener(EventListener listener, Event event)
+ {
+ _listeners.putIfAbsent(event, new ArrayList<EventListener>());
+ final List<EventListener> list = _listeners.get(event);
+ list.add(listener);
+ }
+
+ public void notifyEvent(Event event)
+ {
+ if (_listeners.containsKey(event))
+ {
+ for (EventListener listener : _listeners.get(event))
+ {
+ listener.event(event);
+ }
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index b6a5e80640..de9e73f914 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -20,103 +20,58 @@
*/
package org.apache.qpid.server.store;
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-/** A simple message store that stores the messages in a threadsafe structure in memory. */
-public class MemoryMessageStore implements MessageStore
+/** A simple message store that stores the messages in a thread-safe structure in memory. */
+public class MemoryMessageStore extends NullMessageStore
{
- private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
-
- private static final int DEFAULT_HASHTABLE_CAPACITY = 50000;
-
- private static final String HASHTABLE_CAPACITY_CONFIG = "hashtable-capacity";
-
-
private final AtomicLong _messageId = new AtomicLong(1);
- private AtomicBoolean _closed = new AtomicBoolean(false);
- private LogSubject _logSubject;
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
{
- public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+ @Override
+ public StoreFuture commitTranAsync() throws AMQStoreException
{
+ return StoreFuture.IMMEDIATE_FUTURE;
}
- public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+ @Override
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
}
- public void commitTran() throws AMQStoreException
+ @Override
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
}
- public StoreFuture commitTranAsync() throws AMQStoreException
+ @Override
+ public void commitTran() throws AMQStoreException
{
- return StoreFuture.IMMEDIATE_FUTURE;
}
+ @Override
public void abortTran() throws AMQStoreException
{
}
+ @Override
public void removeXid(long format, byte[] globalId, byte[] branchId)
{
}
+ @Override
public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
{
}
-
};
- public void configureConfigStore(String name, ConfigurationRecoveryHandler handler, Configuration configuration, LogSubject logSubject) throws Exception
- {
- _logSubject = logSubject;
- CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
-
-
- }
-
- public void configureMessageStore(String name,
- MessageStoreRecoveryHandler recoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler,
- Configuration config, LogSubject logSubject) throws Exception
- {
- if(_logSubject == null)
- {
- _logSubject = logSubject;
- }
- int hashtableCapacity = config.getInt(name + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
- _log.info("Using capacity " + hashtableCapacity + " for hash tables");
- CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
- }
-
- public void close() throws Exception
- {
- _closed.getAndSet(true);
- CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
-
- }
-
+ @Override
public StoredMessage addMessage(StorableMessageMetaData metaData)
{
final long id = _messageId.getAndIncrement();
@@ -125,96 +80,21 @@ public class MemoryMessageStore implements MessageStore
return message;
}
-
- public void createExchange(Exchange exchange) throws AMQStoreException
- {
-
- }
-
- public void removeExchange(Exchange exchange) throws AMQStoreException
- {
-
- }
-
- public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
- {
-
- }
-
- public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
- {
-
- }
-
-
- public void createQueue(AMQQueue queue) throws AMQStoreException
- {
- // Not requred to do anything
- }
-
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
- {
- // Not required to do anything
- }
-
- public void removeQueue(final AMQQueue queue) throws AMQStoreException
- {
- // Not required to do anything
- }
-
- public void updateQueue(final AMQQueue queue) throws AMQStoreException
- {
- // Not required to do anything
- }
-
- public void createBrokerLink(final BrokerLink link) throws AMQStoreException
- {
-
- }
-
- public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
- {
-
- }
-
- public void createBridge(final Bridge bridge) throws AMQStoreException
- {
-
- }
-
- public void deleteBridge(final Bridge bridge) throws AMQStoreException
- {
-
- }
-
+ @Override
public Transaction newTransaction()
{
return IN_MEMORY_TRANSACTION;
}
-
- public List<AMQQueue> createQueues() throws AMQException
- {
- return null;
- }
-
- public Long getNewMessageId()
- {
- return _messageId.getAndIncrement();
- }
-
+ @Override
public boolean isPersistent()
{
return false;
}
- private void checkNotClosed() throws MessageStoreClosedException
- {
- if (_closed.get())
- {
- throw new MessageStoreClosedException();
- }
+ @Override
+ public void close() throws Exception
+ {
+ _closed.getAndSet(true);
}
-
-
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java
index fc5d2a4e42..0fb7b1f84f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,23 +20,22 @@
package org.apache.qpid.server.store;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.decorators.EventDecorator;
+import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator;
-public abstract class AbstractMessageStore implements MessageStore
+public class MemoryMessageStoreFactory implements MessageStoreFactory
{
- private LogSubject _logSubject;
- public void configure(VirtualHost virtualHost) throws Exception
+ @Override
+ public MessageStore createMessageStore(LogSubject logSubject)
{
- _logSubject = new MessageStoreLogSubject(virtualHost, this);
- CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
+ return new OperationalLoggingDecorator(new EventDecorator(new MemoryMessageStore()), logSubject);
}
- public void close() throws Exception
+ @Override
+ public String getStoreClassName()
{
- CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
+ return MemoryMessageStore.class.getSimpleName();
}
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index 2114472592..c76d4f223f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.store;
import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.server.logging.LogSubject;
/**
* MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
@@ -42,9 +41,9 @@ public interface MessageStore extends DurableConfigurationStore
void configureMessageStore(String name,
MessageStoreRecoveryHandler messageRecoveryHandler,
TransactionLogRecoveryHandler tlogRecoveryHandler,
- Configuration config, LogSubject logSubject) throws Exception;
-
+ Configuration config) throws Exception;
+ void activate() throws Exception;
public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData);
@@ -65,4 +64,8 @@ public interface MessageStore extends DurableConfigurationStore
*/
void close() throws Exception;
+ void addEventListener(EventListener eventListener, Event event);
+
+ MessageStore getUnderlyingStore();
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java
new file mode 100644
index 0000000000..aba7456a44
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+public class MessageStoreConstants
+{
+
+ public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java
new file mode 100644
index 0000000000..878798eac3
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.logging.LogSubject;
+
+public interface MessageStoreFactory
+{
+ MessageStore createMessageStore(LogSubject logSubject);
+
+ String getStoreClassName();
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
new file mode 100644
index 0000000000..0b55f74730
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class NullMessageStore implements MessageStore
+{
+ @Override
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler,
+ Configuration config) throws Exception
+ {
+ }
+
+ @Override
+ public void createExchange(Exchange exchange) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void removeExchange(Exchange exchange) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void createQueue(AMQQueue queue) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void removeQueue(AMQQueue queue) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void updateQueue(AMQQueue queue) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void createBrokerLink(final BrokerLink link) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void createBridge(final Bridge bridge) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void deleteBridge(final Bridge bridge) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception
+ {
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ }
+
+ @Override
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
+ {
+ return null;
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return false;
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return null;
+ }
+
+ @Override
+ public void activate() throws Exception
+ {
+ }
+
+ @Override
+ public void addEventListener(EventListener eventListener, Event event)
+ {
+ }
+
+ @Override
+ public MessageStore getUnderlyingStore()
+ {
+ return this;
+ }
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java
new file mode 100644
index 0000000000..7928f613d9
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+public enum State
+{
+ INITIAL,
+ CONFIGURING,
+ RECOVERING,
+ ACTIVE,
+ CLOSING,
+ CLOSED
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java
new file mode 100644
index 0000000000..41b3cb81bb
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+
+public class StateManager
+{
+ private State _state = State.INITIAL;
+
+ public synchronized State getState()
+ {
+ return _state;
+ }
+
+ public synchronized void stateTransition(final State current, final State desired)
+ {
+ if (_state != current)
+ {
+ throw new IllegalStateException("Cannot transition to the state: " + desired + "; need to be in state: " + current
+ + "; currently in state: " + _state);
+ }
+ _state = desired;
+ }
+
+ public synchronized boolean isInState(State testedState)
+ {
+ return _state.equals(testedState);
+ }
+
+ public synchronized boolean isNotInState(State testedState)
+ {
+ return !isInState(testedState);
+ }
+
+ public synchronized void checkInState(State checkedState)
+ {
+ if (isNotInState(checkedState))
+ {
+ throw new IllegalStateException("Unexpected state. Was : " + _state + " but expected : " + checkedState);
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java
index 3e720d9de1..7d3bf90a75 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java
@@ -31,11 +31,10 @@ public interface StoreFuture
public void waitForCompletion()
{
-
}
};
boolean isComplete();
void waitForCompletion();
-} \ No newline at end of file
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java
new file mode 100644
index 0000000000..a402e6ee5c
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.decorators;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+
+/**
+ * AbstractDecorator. All methods <bMUST</b> perform simple
+ * delegation to their equivalent decorated counterpart without
+ * change.
+ */
+public class AbstractDecorator implements MessageStore
+{
+ protected final MessageStore _decoratedStore;
+
+ public AbstractDecorator(MessageStore store)
+ {
+ _decoratedStore = store;
+ }
+
+ @Override
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler messageRecoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler,
+ Configuration config) throws Exception
+ {
+ _decoratedStore.configureMessageStore(name, messageRecoveryHandler,
+ tlogRecoveryHandler, config);
+ }
+
+ @Override
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception
+ {
+ _decoratedStore.configureConfigStore(name, recoveryHandler, config);
+ }
+
+ @Override
+ public void activate() throws Exception
+ {
+ _decoratedStore.activate();
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ _decoratedStore.close();
+ }
+
+ @Override
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(
+ T metaData)
+ {
+ return _decoratedStore.addMessage(metaData);
+ }
+
+ @Override
+ public void createExchange(Exchange exchange) throws AMQStoreException
+ {
+ _decoratedStore.createExchange(exchange);
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return _decoratedStore.isPersistent();
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return _decoratedStore.newTransaction();
+ }
+
+ @Override
+ public void removeExchange(Exchange exchange) throws AMQStoreException
+ {
+ _decoratedStore.removeExchange(exchange);
+ }
+
+ @Override
+ public void addEventListener(EventListener eventListener, Event event)
+ {
+ _decoratedStore.addEventListener(eventListener, event);
+ }
+
+ @Override
+ public void bindQueue(Exchange exchange, AMQShortString routingKey,
+ AMQQueue queue, FieldTable args) throws AMQStoreException
+ {
+ _decoratedStore.bindQueue(exchange, routingKey, queue, args);
+ }
+
+ @Override
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey,
+ AMQQueue queue, FieldTable args) throws AMQStoreException
+ {
+ _decoratedStore.unbindQueue(exchange, routingKey, queue, args);
+ }
+
+ @Override
+ public void createQueue(AMQQueue queue) throws AMQStoreException
+ {
+ _decoratedStore.createQueue(queue);
+ }
+
+ @Override
+ public void createQueue(AMQQueue queue, FieldTable arguments)
+ throws AMQStoreException
+ {
+ _decoratedStore.createQueue(queue, arguments);
+ }
+
+ @Override
+ public void removeQueue(AMQQueue queue) throws AMQStoreException
+ {
+ _decoratedStore.removeQueue(queue);
+ }
+
+ @Override
+ public void updateQueue(AMQQueue queue) throws AMQStoreException
+ {
+ _decoratedStore.updateQueue(queue);
+ }
+
+ @Override
+ public void createBrokerLink(BrokerLink link) throws AMQStoreException
+ {
+ _decoratedStore.createBrokerLink(link);
+ }
+
+ @Override
+ public void deleteBrokerLink(BrokerLink link) throws AMQStoreException
+ {
+ _decoratedStore.deleteBrokerLink(link);
+ }
+
+ @Override
+ public void createBridge(Bridge bridge) throws AMQStoreException
+ {
+ _decoratedStore.createBridge(bridge);
+ }
+
+ @Override
+ public void deleteBridge(Bridge bridge) throws AMQStoreException
+ {
+ _decoratedStore.deleteBridge(bridge);
+ }
+
+ @Override
+ public MessageStore getUnderlyingStore()
+ {
+ return _decoratedStore.getUnderlyingStore();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java
new file mode 100644
index 0000000000..dbf179c2e6
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.decorators;
+
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.EventManager;
+import org.apache.qpid.server.store.MessageStore;
+
+public class EventDecorator extends AbstractDecorator
+{
+ protected final EventManager _eventManager;
+
+ public EventDecorator(MessageStore store)
+ {
+ super(store);
+ _eventManager = new EventManager();
+ }
+
+ @Override
+ public void activate() throws Exception
+ {
+ _eventManager.notifyEvent(Event.BEFORE_ACTIVATE);
+ _decoratedStore.activate();
+ _eventManager.notifyEvent(Event.AFTER_ACTIVATE);
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ _eventManager.notifyEvent(Event.BEFORE_CLOSE);
+ _decoratedStore.close();
+ _eventManager.notifyEvent(Event.AFTER_CLOSE);
+ }
+
+ @Override
+ public void addEventListener(EventListener eventListener, Event event)
+ {
+ _eventManager.addEventListener(eventListener, event);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java
new file mode 100644
index 0000000000..81d9645c01
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.decorators;
+
+import static org.apache.qpid.server.store.MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+
+public class OperationalLoggingDecorator extends AbstractDecorator
+{
+ protected final LogSubject _logSubject;
+
+ public OperationalLoggingDecorator(final MessageStore decoratedStore, LogSubject logSubject)
+ {
+ super(decoratedStore);
+ _logSubject = logSubject;
+ }
+
+ @Override
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler messageRecoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler,
+ Configuration config) throws Exception
+ {
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED());
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED());
+
+ if (config != null && config.getString(ENVIRONMENT_PATH_PROPERTY) != null)
+ {
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(config.getString(ENVIRONMENT_PATH_PROPERTY)));
+ }
+
+ _decoratedStore.configureMessageStore(name, messageRecoveryHandler,
+ tlogRecoveryHandler, config);
+ }
+
+ @Override
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception
+ {
+ CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED());
+
+ _decoratedStore.configureConfigStore(name, recoveryHandler, config);
+ }
+
+ @Override
+ public void activate() throws Exception
+ {
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_START());
+ _decoratedStore.activate();
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
+ _decoratedStore.close();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index 86304a0984..a3d1a7999d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -18,7 +18,8 @@
* under the License.
*
*/
-package org.apache.qpid.server.store;
+package org.apache.qpid.server.store.derby;
+
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
@@ -30,13 +31,24 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.federation.Bridge;
import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreConstants;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.State;
+import org.apache.qpid.server.store.StateManager;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMemoryMessage;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLogResource;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -64,9 +76,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
- * An implementation of a {@link MessageStore} that uses Apache Derby as the persistance
+ * An implementation of a {@link MessageStore} that uses Apache Derby as the persistence
* mechanism.
- *
+ *
* TODO extract the SQL statements into a generic JDBC store
*/
public class DerbyMessageStore implements MessageStore
@@ -74,9 +86,6 @@ public class DerbyMessageStore implements MessageStore
private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
- public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
-
-
private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
@@ -94,7 +103,7 @@ public class DerbyMessageStore implements MessageStore
private static final String XID_TABLE_NAME = "QPID_XIDS";
private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS";
-
+
private static final int DB_VERSION = 3;
@@ -157,7 +166,7 @@ public class DerbyMessageStore implements MessageStore
+ " arguments blob, PRIMARY KEY ( id_lsb, id_msb ))";
private static final String SELECT_FROM_LINKS =
"SELECT create_time, arguments FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and id_msb";
- private static final String DELETE_FROM_LINKS = "DELETE FROM " + LINKS_TABLE_NAME
+ private static final String DELETE_FROM_LINKS = "DELETE FROM " + LINKS_TABLE_NAME
+ " WHERE id_lsb = ? and id_msb = ?";
private static final String SELECT_ALL_FROM_LINKS = "SELECT id_lsb, id_msb, create_time, "
+ "arguments FROM " + LINKS_TABLE_NAME;
@@ -175,12 +184,12 @@ public class DerbyMessageStore implements MessageStore
+ " link_id_msb bigint not null,"
+ " arguments blob, PRIMARY KEY ( id_lsb, id_msb ))";
private static final String SELECT_FROM_BRIDGES =
- "SELECT create_time, link_id_lsb, link_id_msb, arguments FROM "
+ "SELECT create_time, link_id_lsb, link_id_msb, arguments FROM "
+ BRIDGES_TABLE_NAME + " WHERE id_lsb = ? and id_msb = ?";
- private static final String DELETE_FROM_BRIDGES = "DELETE FROM " + BRIDGES_TABLE_NAME
+ private static final String DELETE_FROM_BRIDGES = "DELETE FROM " + BRIDGES_TABLE_NAME
+ " WHERE id_lsb = ? and id_msb = ?";
- private static final String SELECT_ALL_FROM_BRIDGES = "SELECT id_lsb, id_msb, "
- + " create_time,"
+ private static final String SELECT_ALL_FROM_BRIDGES = "SELECT id_lsb, id_msb, "
+ + " create_time,"
+ " link_id_lsb, link_id_msb, "
+ "arguments FROM " + BRIDGES_TABLE_NAME
+ " WHERE link_id_lsb = ? and link_id_msb = ?";
@@ -196,7 +205,7 @@ public class DerbyMessageStore implements MessageStore
"CREATE TABLE "+XID_TABLE_NAME+" ( format bigint not null,"
+ " global_id varchar(64) for bit data, branch_id varchar(64) for bit data, PRIMARY KEY ( format, " +
"global_id, branch_id ))";
- private static final String INSERT_INTO_XIDS =
+ private static final String INSERT_INTO_XIDS =
"INSERT INTO "+XID_TABLE_NAME+" ( format, global_id, branch_id ) values (?, ?, ?)";
private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME
+ " WHERE format = ? and global_id = ? and branch_id = ?";
@@ -214,104 +223,64 @@ public class DerbyMessageStore implements MessageStore
"queue_name, message_id ) values (?,?,?,?,?,?) ";
private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME
+ " WHERE format = ? and global_id = ? and branch_id = ?";
- private static final String SELECT_ALL_FROM_XID_ACTIONS =
- "SELECT action_type, queue_name, message_id FROM " + XID_ACTIONS_TABLE_NAME +
+ private static final String SELECT_ALL_FROM_XID_ACTIONS =
+ "SELECT action_type, queue_name, message_id FROM " + XID_ACTIONS_TABLE_NAME +
" WHERE format = ? and global_id = ? and branch_id = ?";
private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
+ private final StateManager _stateManager = new StateManager();
- private LogSubject _logSubject;
- private boolean _configured;
-
-
- private static final class CommitStoreFuture implements StoreFuture
- {
- public boolean isComplete()
- {
- return true;
- }
-
- public void waitForCompletion()
- {
-
- }
- }
+ private MessageStoreRecoveryHandler _messageRecoveryHandler;
- private enum State
- {
- INITIAL,
- CONFIGURING,
- RECOVERING,
- STARTED,
- CLOSING,
- CLOSED
- }
-
- private State _state = State.INITIAL;
+ private TransactionLogRecoveryHandler _tlogRecoveryHandler;
+ private ConfigurationRecoveryHandler _configRecoveryHandler;
+ @Override
public void configureConfigStore(String name,
- ConfigurationRecoveryHandler recoveryHandler,
- Configuration storeConfiguration,
- LogSubject logSubject) throws Exception
+ ConfigurationRecoveryHandler configRecoveryHandler,
+ Configuration storeConfiguration) throws Exception
{
- stateTransition(State.INITIAL, State.CONFIGURING);
- _logSubject = logSubject;
- CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
+ _stateManager.stateTransition(State.INITIAL, State.CONFIGURING);
+ _configRecoveryHandler = configRecoveryHandler;
- if(!_configured)
- {
- commonConfiguration(name, storeConfiguration, logSubject);
- _configured = true;
- }
-
- // this recovers durable exchanges, queues, and bindings
- recover(recoveryHandler);
-
-
- stateTransition(State.RECOVERING, State.STARTED);
+ commonConfiguration(name, storeConfiguration);
}
+ @Override
public void configureMessageStore(String name,
MessageStoreRecoveryHandler recoveryHandler,
TransactionLogRecoveryHandler tlogRecoveryHandler,
- Configuration storeConfiguration, LogSubject logSubject) throws Exception
+ Configuration storeConfiguration) throws Exception
{
- if(!_configured)
- {
-
- _logSubject = logSubject;
- }
-
- CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
-
- if(!_configured)
- {
-
- commonConfiguration(name, storeConfiguration, logSubject);
- _configured = true;
- }
-
- recoverMessages(recoveryHandler);
+ _tlogRecoveryHandler = tlogRecoveryHandler;
+ _messageRecoveryHandler = recoveryHandler;
+ }
- CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
+ @Override
+ public void activate() throws Exception
+ {
+ _stateManager.stateTransition(State.CONFIGURING, State.RECOVERING);
- TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(tlogRecoveryHandler);
+ // this recovers durable exchanges, queues, and bindings
+ recoverConfiguration(_configRecoveryHandler);
+ recoverMessages(_messageRecoveryHandler);
+ TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(_tlogRecoveryHandler);
recoverXids(dtxrh);
-
+ _stateManager.stateTransition(State.RECOVERING, State.ACTIVE);
}
- private void commonConfiguration(String name, Configuration storeConfiguration, LogSubject logSubject)
+ private void commonConfiguration(String name, Configuration storeConfiguration)
throws ClassNotFoundException, SQLException
{
initialiseDriver();
//Update to pick up QPID_WORK and use that as the default location not just derbyDB
- final String databasePath = storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")
+ final String databasePath = storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")
+ File.separator + "derbyDB");
File environmentPath = new File(databasePath);
@@ -324,8 +293,6 @@ public class DerbyMessageStore implements MessageStore
}
}
- CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(environmentPath.getAbsolutePath()));
-
createOrOpenDatabase(name, databasePath);
}
@@ -576,19 +543,15 @@ public class DerbyMessageStore implements MessageStore
{
stmt.close();
}
-
}
- public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQException
+ private void recoverConfiguration(ConfigurationRecoveryHandler recoveryHandler) throws AMQException
{
- stateTransition(State.CONFIGURING, State.RECOVERING);
-
- CurrentActor.get().message(_logSubject,MessageStoreMessages.RECOVERY_START());
try
{
ConfigurationRecoveryHandler.QueueRecoveryHandler qrh = recoveryHandler.begin(this);
- loadQueues(qrh);
+ recoverQueues(qrh);
ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
List<String> exchanges = loadExchanges(erh);
@@ -632,12 +595,12 @@ public class DerbyMessageStore implements MessageStore
Blob argumentsAsBlob = rs.getBlob(4);
byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
-
+
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes));
int size = dis.readInt();
-
+
Map<String,String> arguments = new HashMap<String, String>();
-
+
for(int i = 0; i < size; i++)
{
arguments.put(dis.readUTF(), dis.readUTF());
@@ -744,7 +707,7 @@ public class DerbyMessageStore implements MessageStore
}
- private void loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException
+ private void recoverQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
@@ -759,6 +722,7 @@ public class DerbyMessageStore implements MessageStore
while(rs.next())
{
String queueName = rs.getString(1);
+ _logger.debug("Got queue " + queueName);
String owner = rs.getString(2);
boolean exclusive = rs.getBoolean(3);
Blob argumentsAsBlob = rs.getBlob(4);
@@ -913,10 +877,11 @@ public class DerbyMessageStore implements MessageStore
+ @Override
public void close() throws Exception
{
- CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
_closed.getAndSet(true);
+ _stateManager.stateTransition(State.ACTIVE, State.CLOSING);
try
{
@@ -926,9 +891,9 @@ public class DerbyMessageStore implements MessageStore
_logger.error("Unable to shut down the store");
}
catch (SQLException e)
- {
- if (e.getSQLState().equalsIgnoreCase(DERBY_SINGLE_DB_SHUTDOWN_CODE))
- {
+ {
+ if (e.getSQLState().equalsIgnoreCase(DERBY_SINGLE_DB_SHUTDOWN_CODE))
+ {
//expected and represents a clean shutdown of this database only, do nothing.
}
else
@@ -936,8 +901,11 @@ public class DerbyMessageStore implements MessageStore
_logger.error("Exception whilst shutting down the store: " + e);
}
}
+
+ _stateManager.stateTransition(State.CLOSING, State.CLOSED);
}
+ @Override
public StoredMessage addMessage(StorableMessageMetaData metaData)
{
if(metaData.isPersistent())
@@ -1015,9 +983,10 @@ public class DerbyMessageStore implements MessageStore
}
+ @Override
public void createExchange(Exchange exchange) throws AMQStoreException
{
- if (_state != State.RECOVERING)
+ if (_stateManager.isInState(State.ACTIVE))
{
try
{
@@ -1077,6 +1046,7 @@ public class DerbyMessageStore implements MessageStore
}
+ @Override
public void removeExchange(Exchange exchange) throws AMQStoreException
{
@@ -1112,10 +1082,11 @@ public class DerbyMessageStore implements MessageStore
}
}
+ @Override
public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
throws AMQStoreException
{
- if (_state != State.RECOVERING)
+ if (_stateManager.isInState(State.ACTIVE))
{
try
{
@@ -1189,6 +1160,7 @@ public class DerbyMessageStore implements MessageStore
}
+ @Override
public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
throws AMQStoreException
{
@@ -1224,16 +1196,18 @@ public class DerbyMessageStore implements MessageStore
}
}
+ @Override
public void createQueue(AMQQueue queue) throws AMQStoreException
{
createQueue(queue, null);
}
+ @Override
public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
{
_logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
- if (_state != State.RECOVERING)
+ if (_stateManager.isInState(State.ACTIVE))
{
try
{
@@ -1299,7 +1273,7 @@ public class DerbyMessageStore implements MessageStore
}
}
}
-
+
/**
* Updates the specified queue in the persistent store, IF it is already present. If the queue
* is not present in the store, it will not be added.
@@ -1309,9 +1283,10 @@ public class DerbyMessageStore implements MessageStore
* @param queue The queue to update the entry for.
* @throws AMQStoreException If the operation fails for any reason.
*/
+ @Override
public void updateQueue(final AMQQueue queue) throws AMQStoreException
{
- if (_state != State.RECOVERING)
+ if (_stateManager.isInState(State.ACTIVE))
{
try
{
@@ -1363,7 +1338,7 @@ public class DerbyMessageStore implements MessageStore
throw new AMQStoreException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e);
}
}
-
+
}
/**
@@ -1389,7 +1364,7 @@ public class DerbyMessageStore implements MessageStore
throw sqlEx;
}
}
-
+
return connection;
}
@@ -1419,6 +1394,7 @@ public class DerbyMessageStore implements MessageStore
return connection;
}
+ @Override
public void removeQueue(final AMQQueue queue) throws AMQStoreException
{
AMQShortString name = queue.getNameShortString();
@@ -1450,11 +1426,12 @@ public class DerbyMessageStore implements MessageStore
}
+ @Override
public void createBrokerLink(final BrokerLink link) throws AMQStoreException
{
_logger.debug("public void createBrokerLink(BrokerLink = " + link + "): called");
- if (_state != State.RECOVERING)
+ if (_stateManager.isInState(State.ACTIVE))
{
try
{
@@ -1463,7 +1440,7 @@ public class DerbyMessageStore implements MessageStore
PreparedStatement stmt = conn.prepareStatement(FIND_LINK);
try
{
-
+
stmt.setLong(1, link.getId().getLeastSignificantBits());
stmt.setLong(2, link.getId().getMostSignificantBits());
ResultSet rs = stmt.executeQuery();
@@ -1477,7 +1454,7 @@ public class DerbyMessageStore implements MessageStore
try
{
-
+
insertStmt.setLong(1, link.getId().getLeastSignificantBits());
insertStmt.setLong(2, link.getId().getMostSignificantBits());
insertStmt.setLong(3, link.getCreateTime());
@@ -1546,6 +1523,7 @@ public class DerbyMessageStore implements MessageStore
return argumentBytes;
}
+ @Override
public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
{
_logger.debug("public void deleteBrokerLink( " + link + "): called");
@@ -1577,11 +1555,12 @@ public class DerbyMessageStore implements MessageStore
}
+ @Override
public void createBridge(final Bridge bridge) throws AMQStoreException
{
_logger.debug("public void createBridge(BrokerLink = " + bridge + "): called");
- if (_state != State.RECOVERING)
+ if (_stateManager.isInState(State.ACTIVE))
{
try
{
@@ -1647,6 +1626,7 @@ public class DerbyMessageStore implements MessageStore
}
}
+ @Override
public void deleteBridge(final Bridge bridge) throws AMQStoreException
{
_logger.debug("public void deleteBridge( " + bridge + "): called");
@@ -1677,6 +1657,7 @@ public class DerbyMessageStore implements MessageStore
}
+ @Override
public Transaction newTransaction()
{
return new DerbyTransaction();
@@ -1695,7 +1676,7 @@ public class DerbyMessageStore implements MessageStore
{
_logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
}
-
+
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
try
{
@@ -1834,7 +1815,7 @@ public class DerbyMessageStore implements MessageStore
{
stmt.close();
}
-
+
stmt = conn.prepareStatement(INSERT_INTO_XID_ACTIONS);
try
@@ -1879,7 +1860,7 @@ public class DerbyMessageStore implements MessageStore
}
}
-
+
private static final class ConnectionWrapper
{
private final Connection _connection;
@@ -1924,7 +1905,7 @@ public class DerbyMessageStore implements MessageStore
public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws AMQStoreException
{
commitTran(connWrapper);
- return new CommitStoreFuture();
+ return StoreFuture.IMMEDIATE_FUTURE;
}
public void abortTran(ConnectionWrapper connWrapper) throws AMQStoreException
@@ -1965,7 +1946,7 @@ public class DerbyMessageStore implements MessageStore
{
_logger.debug("Adding metadata for message " +messageId);
}
-
+
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_META_DATA);
try
{
@@ -2008,7 +1989,7 @@ public class DerbyMessageStore implements MessageStore
{
stmt.close();
}
-
+
}
@@ -2154,31 +2135,37 @@ public class DerbyMessageStore implements MessageStore
_messageNumber = messageNumber;
}
+ @Override
public TransactionLogResource getQueue()
{
return this;
}
+ @Override
public EnqueableMessage getMessage()
{
return this;
}
+ @Override
public long getMessageNumber()
{
return _messageNumber;
}
+ @Override
public boolean isPersistent()
{
return true;
}
+ @Override
public StoredMessage getStoredMessage()
{
throw new UnsupportedOperationException();
}
+ @Override
public String getResourceName()
{
return _queueName;
@@ -2191,7 +2178,7 @@ public class DerbyMessageStore implements MessageStore
try
{
List<Xid> xids = new ArrayList<Xid>();
-
+
Statement stmt = conn.createStatement();
try
{
@@ -2217,15 +2204,15 @@ public class DerbyMessageStore implements MessageStore
stmt.close();
}
-
-
+
+
for(Xid xid : xids)
{
List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
-
+
PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
-
+
try
{
pstmt.setLong(1, xid.getFormat());
@@ -2256,13 +2243,13 @@ public class DerbyMessageStore implements MessageStore
{
pstmt.close();
}
-
- dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
- enqueues.toArray(new RecordImpl[enqueues.size()]),
+
+ dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
+ enqueues.toArray(new RecordImpl[enqueues.size()]),
dequeues.toArray(new RecordImpl[dequeues.size()]));
}
-
-
+
+
dtxrh.completeDtxRecordRecovery();
}
finally
@@ -2271,7 +2258,7 @@ public class DerbyMessageStore implements MessageStore
}
}
-
+
StorableMessageMetaData getMetaData(long messageId) throws SQLException
{
@@ -2417,24 +2404,13 @@ public class DerbyMessageStore implements MessageStore
}
+ @Override
public boolean isPersistent()
{
return true;
}
- private synchronized void stateTransition(State requiredState, State newState) throws AMQStoreException
- {
- if (_state != requiredState)
- {
- throw new AMQStoreException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
- + "; currently in state: " + _state);
- }
-
- _state = newState;
- }
-
-
private class DerbyTransaction implements Transaction
{
private final ConnectionWrapper _connWrapper;
@@ -2452,6 +2428,7 @@ public class DerbyMessageStore implements MessageStore
}
}
+ @Override
public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
if(message.getStoredMessage() instanceof StoredDerbyMessage)
@@ -2469,32 +2446,38 @@ public class DerbyMessageStore implements MessageStore
DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
}
+ @Override
public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
}
+ @Override
public void commitTran() throws AMQStoreException
{
DerbyMessageStore.this.commitTran(_connWrapper);
}
+ @Override
public StoreFuture commitTranAsync() throws AMQStoreException
{
return DerbyMessageStore.this.commitTranAsync(_connWrapper);
}
+ @Override
public void abortTran() throws AMQStoreException
{
DerbyMessageStore.this.abortTran(_connWrapper);
}
+ @Override
public void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException
{
DerbyMessageStore.this.removeXid(_connWrapper, format, globalId, branchId);
}
+ @Override
public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
throws AMQStoreException
{
@@ -2512,7 +2495,7 @@ public class DerbyMessageStore implements MessageStore
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
private byte[] _data;
private volatile SoftReference<byte[]> _dataRef;
-
+
StoredDerbyMessage(long messageId, StorableMessageMetaData metaData)
{
@@ -2524,15 +2507,16 @@ public class DerbyMessageStore implements MessageStore
StorableMessageMetaData metaData, boolean persist)
{
_messageId = messageId;
-
+
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
if(persist)
{
- _metaData = metaData;
+ _metaData = metaData;
}
}
+ @Override
public StorableMessageMetaData getMetaData()
{
StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData;
@@ -2552,11 +2536,13 @@ public class DerbyMessageStore implements MessageStore
return metaData;
}
+ @Override
public long getMessageNumber()
{
return _messageId;
}
+ @Override
public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
{
src = src.slice();
@@ -2576,9 +2562,10 @@ public class DerbyMessageStore implements MessageStore
System.arraycopy(oldData,0,_data,0,oldData.length);
src.duplicate().get(_data, oldData.length, src.remaining());
}
-
+
}
+ @Override
public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
{
byte[] data = _dataRef == null ? null : _dataRef.get();
@@ -2595,6 +2582,7 @@ public class DerbyMessageStore implements MessageStore
}
+ @Override
public ByteBuffer getContent(int offsetInMessage, int size)
{
ByteBuffer buf = ByteBuffer.allocate(size);
@@ -2603,6 +2591,7 @@ public class DerbyMessageStore implements MessageStore
return buf;
}
+ @Override
public synchronized StoreFuture flushToStore()
{
try
@@ -2612,7 +2601,7 @@ public class DerbyMessageStore implements MessageStore
Connection conn = newConnection();
store(conn);
-
+
conn.commit();
conn.close();
}
@@ -2651,6 +2640,7 @@ public class DerbyMessageStore implements MessageStore
}
}
+ @Override
public void remove()
{
DerbyMessageStore.this.removeMessage(_messageId);
@@ -2687,4 +2677,21 @@ public class DerbyMessageStore implements MessageStore
}
}
+ @Override
+ public void addEventListener(EventListener eventListener, Event event)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public MessageStore getUnderlyingStore()
+ {
+ return this;
+ }
+
+ public String getDatabaseProviderName()
+ {
+ return DerbyMessageStore.class.getName();
+ }
+
} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
new file mode 100644
index 0000000000..02b59dfc06
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.derby;
+
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreFactory;
+import org.apache.qpid.server.store.decorators.EventDecorator;
+import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator;
+
+public class DerbyMessageStoreFactory implements MessageStoreFactory
+{
+
+ @Override
+ public MessageStore createMessageStore(LogSubject logSubject)
+ {
+ return new OperationalLoggingDecorator(new EventDecorator(new DerbyMessageStore()), logSubject);
+ }
+
+ @Override
+ public String getStoreClassName()
+ {
+ return DerbyMessageStore.class.getSimpleName();
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
index 28d8cb2ec7..5460c89eab 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.AuthenticationResult;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.virtualhost.State;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
import org.slf4j.Logger;
@@ -177,6 +178,11 @@ public class ServerConnectionDelegate extends ServerDelegate
sconn.setState(Connection.State.CLOSING);
sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Permission denied '"+vhostName+"'"));
}
+ else if (vhost.getState() != State.ACTIVE)
+ {
+ sconn.setState(Connection.State.CLOSING);
+ sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Virtual host '"+vhostName+"' is not active"));
+ }
else
{
sconn.setState(Connection.State.OPEN);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java
new file mode 100644
index 0000000000..fb50b3e289
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+public enum State
+{
+ INITIALISING,
+ ACTIVE,
+ PASSIVE,
+ STOPPED
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 00c8d1ff27..2ef110641e 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -97,4 +97,6 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo
void removeBrokerConnection(BrokerLink brokerLink);
ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask);
+
+ State getState();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index 1da5b8d0c7..1db26a5f04 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
@@ -92,7 +93,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
public VirtualHostConfigRecoveryHandler begin(MessageStore store)
{
- _logSubject = new MessageStoreLogSubject(_virtualHost,store);
+ _logSubject = new MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName());
_store = store;
CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false));
@@ -355,31 +356,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
}
- private static final class ProcessAction
- {
- private final AMQQueue _queue;
- private final AMQMessage _message;
-
- public ProcessAction(AMQQueue queue, AMQMessage message)
- {
- _queue = queue;
- _message = message;
- }
-
- public void process()
- {
- try
- {
- _queue.enqueue(_message);
- }
- catch(AMQException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- }
-
public void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf)
{
try
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index eccaf553cd..530be46d70 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -20,15 +20,11 @@
*/
package org.apache.qpid.server.virtualhost;
-import java.util.concurrent.ScheduledFuture;
-import org.apache.commons.configuration.Configuration;
+
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQBrokerManagerMBean;
import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.configuration.BrokerConfig;
@@ -45,9 +41,7 @@ import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.federation.Bridge;
import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
@@ -62,26 +56,28 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+import org.apache.qpid.server.store.MessageStoreFactory;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
+import javax.management.JMException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
+
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+
public class VirtualHostImpl implements VirtualHost
{
private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class);
@@ -104,29 +100,31 @@ public class VirtualHostImpl implements VirtualHost
private final BrokerConfig _brokerConfig;
- private final VirtualHostConfiguration _configuration;
+ private final VirtualHostConfiguration _vhostConfig;
- private ConnectionRegistry _connectionRegistry;
+ private final VirtualHostMBean _virtualHostMBean;
- private QueueRegistry _queueRegistry;
+ private final AMQBrokerManagerMBean _brokerMBean;
- private ExchangeRegistry _exchangeRegistry;
+ private final QueueRegistry _queueRegistry;
- private ExchangeFactory _exchangeFactory;
+ private final ExchangeRegistry _exchangeRegistry;
- private MessageStore _messageStore;
+ private final ExchangeFactory _exchangeFactory;
- private DtxRegistry _dtxRegistry;
+ private final ConnectionRegistry _connectionRegistry;
- private VirtualHostMBean _virtualHostMBean;
+ private final BindingFactory _bindingFactory;
- private AMQBrokerManagerMBean _brokerMBean;
+ private final DtxRegistry _dtxRegistry;
- private BindingFactory _bindingFactory;
+ private final MessageStore _messageStore;
+
+ private State _state = State.INITIALISING;
private boolean _statisticsEnabled = false;
- private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+ private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception
{
@@ -135,53 +133,45 @@ public class VirtualHostImpl implements VirtualHost
throw new IllegalArgumentException("HostConfig cannot be null");
}
+ if (hostConfig.getName() == null || hostConfig.getName().length() == 0)
+ {
+ throw new IllegalArgumentException("Illegal name (" + hostConfig.getName() + ") for virtualhost.");
+ }
+
_appRegistry = appRegistry;
_brokerConfig = _appRegistry.getBroker();
- _configuration = hostConfig;
- _name = _configuration.getName();
+ _vhostConfig = hostConfig;
+ _name = _vhostConfig.getName();
_dtxRegistry = new DtxRegistry();
_id = _appRegistry.getConfigStore().createId();
CurrentActor.get().message(VirtualHostMessages.CREATED(_name));
- if (_name == null || _name.length() == 0)
- {
- throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost.");
- }
-
- _securityManager = new SecurityManager(_appRegistry.getSecurityManager());
- _securityManager.configureHostPlugins(_configuration);
-
_virtualHostMBean = new VirtualHostMBean();
+ _securityManager = new SecurityManager(_appRegistry.getSecurityManager());
+ _securityManager.configureHostPlugins(_vhostConfig);
_connectionRegistry = new ConnectionRegistry();
- _houseKeepingTasks = new ScheduledThreadPoolExecutor(_configuration.getHouseKeepingThreadCount());
+ _houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount());
_queueRegistry = new DefaultQueueRegistry(this);
_exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeFactory.initialise(_configuration);
+ _exchangeFactory.initialise(_vhostConfig);
_exchangeRegistry = new DefaultExchangeRegistry(this);
- StartupRoutingTable configFileRT = new StartupRoutingTable();
-
- _messageStore = configFileRT;
-
- // This needs to be after the RT has been defined as it creates the default durable exchanges.
- _exchangeRegistry.initialise();
-
_bindingFactory = new BindingFactory(this);
- initialiseModel(_configuration);
+ _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
- initialiseMessageStore(hostConfig);
+ _messageStore = initialiseMessageStore(hostConfig.getMessageStoreFactoryClass());
- _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
- _brokerMBean.register();
- initialiseHouseKeeping(hostConfig.getHousekeepingCheckPeriod());
+ configureMessageStore(hostConfig);
+
+ activateNonHAMessageStore();
initialiseStatistics();
}
@@ -193,7 +183,7 @@ public class VirtualHostImpl implements VirtualHost
public VirtualHostConfiguration getConfiguration()
{
- return _configuration;
+ return _vhostConfig;
}
public UUID getId()
@@ -217,47 +207,16 @@ public class VirtualHostImpl implements VirtualHost
}
/**
- * Virtual host JMX MBean class.
- *
- * This has some of the methods implemented from management interface for exchanges. Any
- * Implementation of an Exchange MBean should extend this class.
- */
- public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
- {
- public VirtualHostMBean() throws NotCompliantMBeanException
- {
- super(ManagedVirtualHost.class, ManagedVirtualHost.TYPE);
- }
-
- public String getObjectInstanceName()
- {
- return ObjectName.quote(_name);
- }
-
- public String getName()
- {
- return _name;
- }
-
- public VirtualHostImpl getVirtualHost()
- {
- return VirtualHostImpl.this;
- }
- }
-
-
- /**
* Initialise a housekeeping task to iterate over queues cleaning expired messages with no consumers
* and checking for idle or open transactions that have exceeded the permitted thresholds.
*
* @param period
*/
- private void initialiseHouseKeeping(long period)
+ private void initialiseHouseKeeping(long period)
{
+
if (period != 0L)
{
-
-
scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask());
Map<String, VirtualHostPluginFactory> plugins = _appRegistry.getPluginManager().getVirtualHostPlugins();
@@ -290,50 +249,30 @@ public class VirtualHostImpl implements VirtualHost
}
}
- private class VirtualHostHouseKeepingTask extends HouseKeepingTask
+ private void shutdownHouseKeeping()
{
- public VirtualHostHouseKeepingTask()
- {
- super(VirtualHostImpl.this);
- }
+ _houseKeepingTasks.shutdown();
- public void execute()
+ try
+ {
+ if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS))
+ {
+ _houseKeepingTasks.shutdownNow();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Interrupted during Housekeeping shutdown:", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void removeHouseKeepingTasks()
+ {
+ BlockingQueue<Runnable> taskQueue = _houseKeepingTasks.getQueue();
+ for (final Runnable runnable : taskQueue)
{
- for (AMQQueue q : _queueRegistry.getQueues())
- {
- _logger.debug("Checking message status for queue: "
- + q.getName());
- try
- {
- q.checkMessageStatus();
- }
- catch (Exception e)
- {
- _logger.error("Exception in housekeeping for queue: "
- + q.getNameShortString().toString(), e);
- //Don't throw exceptions as this will stop the
- // house keeping task from running.
- }
- }
- for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
- {
- _logger.debug("Checking for long running open transactions on connection " + connection);
- for (AMQSessionModel session : connection.getSessionModels())
- {
- _logger.debug("Checking for long running open transactions on session " + session);
- try
- {
- session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(),
- _configuration.getTransactionTimeoutOpenClose(),
- _configuration.getTransactionTimeoutIdleWarn(),
- _configuration.getTransactionTimeoutIdleClose());
- }
- catch (Exception e)
- {
- _logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
- }
- }
- }
+ _houseKeepingTasks.remove(runnable);
}
}
@@ -381,36 +320,43 @@ public class VirtualHostImpl implements VirtualHost
}
- private void initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception
+ private MessageStore initialiseMessageStore(final String messageStoreFactoryClass) throws Exception
{
- String messageStoreClass = hostConfig.getMessageStoreClass();
-
- Class<?> clazz = Class.forName(messageStoreClass);
- Object o = clazz.newInstance();
+ final Class<?> clazz = Class.forName(messageStoreFactoryClass);
+ final Object o = clazz.newInstance();
- if (!(o instanceof MessageStore))
+ if (!(o instanceof MessageStoreFactory))
{
- throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
- " does not.");
+ throw new ClassCastException("Message store factory class must implement " + MessageStoreFactory.class +
+ ". Class " + clazz + " does not.");
}
- MessageStore messageStore = (MessageStore) o;
- VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
- MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore);
+ final MessageStoreFactory messageStoreFactory = (MessageStoreFactory) o;
+ final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStoreFactory.getStoreClassName());
+ final MessageStore messageStore = messageStoreFactory.createMessageStore(storeLogSubject);
- messageStore.configureConfigStore(this.getName(),
- recoveryHandler,
- hostConfig.getStoreConfiguration(),
- storeLogSubject);
+ messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE);
+ messageStore.addEventListener(new AfterActivationListener(), Event.AFTER_ACTIVATE);
+ messageStore.addEventListener(new BeforeCloseListener(), Event.BEFORE_CLOSE);
+ messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE);
- messageStore.configureMessageStore(this.getName(),
- recoveryHandler,
- recoveryHandler,
- hostConfig.getStoreConfiguration(), storeLogSubject);
+ return messageStore;
+ }
- _messageStore = messageStore;
+ private void configureMessageStore(VirtualHostConfiguration hostConfig) throws Exception
+ {
+ VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
+ // TODO perhaps pass config on construction??
+ _messageStore.configureConfigStore(getName(), recoveryHandler, hostConfig.getStoreConfiguration());
+ _messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler, hostConfig.getStoreConfiguration());
+
+ }
+
+ private void activateNonHAMessageStore() throws Exception
+ {
+ _messageStore.activate();
}
private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException
@@ -424,7 +370,7 @@ public class VirtualHostImpl implements VirtualHost
configureExchange(config.getExchangeConfiguration(exchangeName));
}
- String[] queueNames = config.getQueueNames();
+ String[] queueNames = config.getQueueNames();
for (Object queueNameObj : queueNames)
{
@@ -435,16 +381,16 @@ public class VirtualHostImpl implements VirtualHost
private void configureExchange(ExchangeConfiguration exchangeConfiguration) throws AMQException
{
- AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName());
+ AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName());
Exchange exchange;
exchange = _exchangeRegistry.getExchange(exchangeName);
if (exchange == null)
{
- AMQShortString type = new AMQShortString(exchangeConfiguration.getType());
- boolean durable = exchangeConfiguration.getDurable();
- boolean autodelete = exchangeConfiguration.getAutoDelete();
+ AMQShortString type = new AMQShortString(exchangeConfiguration.getType());
+ boolean durable = exchangeConfiguration.getDurable();
+ boolean autodelete = exchangeConfiguration.getAutoDelete();
Exchange newExchange = _exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0);
_exchangeRegistry.registerExchange(newExchange);
@@ -458,7 +404,7 @@ public class VirtualHostImpl implements VirtualHost
private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this);
String queueName = queue.getName();
if (queue.isDurable())
@@ -467,13 +413,13 @@ public class VirtualHostImpl implements VirtualHost
}
//get the exchange name (returns default exchange name if none was specified)
- String exchangeName = queueConfiguration.getExchange();
+ String exchangeName = queueConfiguration.getExchange();
Exchange exchange = _exchangeRegistry.getExchange(exchangeName);
- if (exchange == null)
- {
+ if (exchange == null)
+ {
throw new ConfigurationException("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName);
- }
+ }
Exchange defaultExchange = _exchangeRegistry.getDefaultExchange();
@@ -561,39 +507,8 @@ public class VirtualHostImpl implements VirtualHost
{
//Stop Connections
_connectionRegistry.close();
-
- //Stop the Queues processing
- if (_queueRegistry != null)
- {
- for (AMQQueue queue : _queueRegistry.getQueues())
- {
- queue.stop();
- }
- }
-
- //Stop Housekeeping
- if (_houseKeepingTasks != null)
- {
- _houseKeepingTasks.shutdown();
-
- try
- {
- if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS))
- {
- _houseKeepingTasks.shutdownNow();
- }
- }
- catch (InterruptedException e)
- {
- _logger.warn("Interrupted during Housekeeping shutdown:" + e.getMessage());
- // Swallowing InterruptedException ok as we are shutting down.
- }
- }
-
- if(_dtxRegistry != null)
- {
- _dtxRegistry.close();
- }
+ _queueRegistry.stopAllAndUnregisterMBeans();
+ _dtxRegistry.close();
//Close MessageStore
if (_messageStore != null)
@@ -609,6 +524,8 @@ public class VirtualHostImpl implements VirtualHost
}
}
+ _state = State.STOPPED;
+
CurrentActor.get().message(VirtualHostMessages.CLOSED());
}
@@ -720,7 +637,6 @@ public class VirtualHostImpl implements VirtualHost
return blink;
}
-
public void createBrokerConnection(final String transport,
final String host,
final int port,
@@ -767,105 +683,165 @@ public class VirtualHostImpl implements VirtualHost
return _dtxRegistry;
}
- /**
- * Temporary Startup RT class to record the creation of persistent queues / exchanges.
- *
- *
- * This is so we can replay the creation of queues/exchanges in to the real _RT after it has been loaded.
- * This should be removed after the _RT has been fully split from the the TL
- */
- private static class StartupRoutingTable implements MessageStore
+ @Override
+ public String toString()
{
- public void configureConfigStore(String name,
- ConfigurationRecoveryHandler recoveryHandler,
- Configuration config,
- LogSubject logSubject) throws Exception
- {
- }
-
- public void createExchange(Exchange exchange) throws AMQStoreException
- {
- }
-
- public void removeExchange(Exchange exchange) throws AMQStoreException
- {
- }
-
- public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
- {
- }
-
- public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
- {
- }
-
- public void createQueue(AMQQueue queue) throws AMQStoreException
- {
- }
-
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
- {
- }
-
- public void removeQueue(AMQQueue queue) throws AMQStoreException
- {
- }
-
- public void updateQueue(AMQQueue queue) throws AMQStoreException
- {
- }
+ return _name;
+ }
- public void createBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- }
+ @Override
+ public State getState()
+ {
+ return _state;
+ }
- public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- }
- public void createBridge(final Bridge bridge) throws AMQStoreException
+ /**
+ * Virtual host JMX MBean class.
+ *
+ * This has some of the methods implemented from management interface for exchanges. Any
+ * Implementation of an Exchange MBean should extend this class.
+ */
+ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
+ {
+ public VirtualHostMBean() throws NotCompliantMBeanException
{
+ super(ManagedVirtualHost.class, ManagedVirtualHost.TYPE);
}
- public void deleteBridge(final Bridge bridge) throws AMQStoreException
+ public String getObjectInstanceName()
{
+ return ObjectName.quote(_name);
}
- @Override
- public void configureMessageStore(String name,
- MessageStoreRecoveryHandler recoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config, LogSubject logSubject) throws Exception
+ public String getName()
{
+ return _name;
}
- @Override
- public void close() throws Exception
+ public VirtualHostImpl getVirtualHost()
{
+ return VirtualHostImpl.this;
}
+ }
- @Override
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(
- T metaData)
- {
- return null;
- }
+ private final class BeforeActivationListener implements EventListener
+ {
+ @Override
+ public void event(Event event)
+ {
+ try
+ {
+ _exchangeRegistry.initialise();
+ initialiseModel(_vhostConfig);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed to initialise virtual host after state change", e);
+ }
+ }
+ }
+
+ private final class AfterActivationListener implements EventListener
+ {
+ @Override
+ public void event(Event event)
+ {
+ initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
+ try
+ {
+ _brokerMBean.register();
+ }
+ catch (JMException e)
+ {
+ throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e);
+ }
+
+ _state = State.ACTIVE;
+ }
+ }
+
+ public class BeforePassivationListener implements EventListener
+ {
@Override
- public boolean isPersistent()
+ public void event(Event event)
{
- return false;
- }
+ _connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
+ _brokerMBean.unregister();
+ removeHouseKeepingTasks();
- @Override
- public Transaction newTransaction()
- {
- return null;
- }
- }
+ _queueRegistry.stopAllAndUnregisterMBeans();
+ _exchangeRegistry.clearAndUnregisterMbeans();
+ _dtxRegistry.close();
- @Override
- public String toString()
- {
- return _name;
- }
+ _state = State.PASSIVE;
+ }
+ }
+
+ private final class BeforeCloseListener implements EventListener
+ {
+ @Override
+ public void event(Event event)
+ {
+ _brokerMBean.unregister();
+ shutdownHouseKeeping();
+ }
+ }
+
+ private class VirtualHostHouseKeepingTask extends HouseKeepingTask
+ {
+ public VirtualHostHouseKeepingTask()
+ {
+ super(VirtualHostImpl.this);
+ }
+
+ public void execute()
+ {
+ for (AMQQueue q : _queueRegistry.getQueues())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Checking message status for queue: "
+ + q.getName());
+ }
+ try
+ {
+ q.checkMessageStatus();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Exception in housekeeping for queue: "
+ + q.getNameShortString().toString(), e);
+ //Don't throw exceptions as this will stop the
+ // house keeping task from running.
+ }
+ }
+ for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Checking for long running open transactions on connection " + connection);
+ }
+ for (AMQSessionModel session : connection.getSessionModels())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Checking for long running open transactions on session " + session);
+ }
+ try
+ {
+ session.checkTransactionStatus(_vhostConfig.getTransactionTimeoutOpenWarn(),
+ _vhostConfig.getTransactionTimeoutOpenClose(),
+ _vhostConfig.getTransactionTimeoutIdleWarn(),
+ _vhostConfig.getTransactionTimeoutIdleClose());
+ }
+ catch (Exception e)
+ {
+ _logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
+ }
+ }
+ }
+ }
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
index d34d1bbef3..5c500771c2 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory;
import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -165,7 +166,7 @@ public class AMQBrokerManagerMBeanTest extends QpidTestCase
XMLConfiguration configXml = new XMLConfiguration();
configXml.addProperty("virtualhosts.virtualhost(-1).name", "test");
- configXml.addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName());
+ configXml.addProperty("virtualhosts.virtualhost(-1).test.store.factoryclass", TestableMemoryMessageStoreFactory.class.getName());
ServerConfiguration configuration = new ServerConfiguration(configXml);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
index 50e7f0588b..c4c93acfb6 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -161,7 +162,7 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase
getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "r2d2");
getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues.queue.r2d2.deadLetterQueues", "true");
getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "c3p0");
- getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.store.class", TestableMemoryMessageStore.class.getName());
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.store.factoryclass", TestableMemoryMessageStoreFactory.class.getName());
// Start the broker now.
super.createBroker();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 488f251b0a..b6ee95a1cb 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -20,8 +20,18 @@
*/
package org.apache.qpid.server.exchange;
-import org.apache.log4j.Logger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -31,7 +41,6 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageMetaData;
@@ -44,23 +53,10 @@ import org.apache.qpid.server.queue.MockStoredMessage;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
{
private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class);
@@ -68,24 +64,6 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
private final HeadersExchange exchange = new HeadersExchange();
protected final Set<TestQueue> queues = new HashSet<TestQueue>();
-
-
- /**
- * Not used in this test, just there to stub out the routing calls
- */
- private MemoryMessageStore _store = new MemoryMessageStore();
-
-
- private BindingFactory bindingFactory = new BindingFactory(new DurableConfigurationStore.Source()
- {
-
- public DurableConfigurationStore getMessageStore()
- {
- return _store;
- }
- },
- exchange);
-
private int count;
public void testDoNothing()
@@ -103,7 +81,6 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
protected void unbind(TestQueue queue, String... bindings) throws AMQException
{
String queueName = queue.getName();
- //TODO - check this
exchange.onUnbind(new Binding(null,queueName, queue, exchange, getHeadersMap(bindings)));
}
@@ -538,12 +515,6 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
}
- public AMQMessage getUnderlyingMessage()
- {
- return Message.this;
- }
-
-
public ContentHeaderBody getContentHeader()
{
try
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java
index cc032a0430..3377573b9d 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java
@@ -29,12 +29,10 @@ public class MessageStoreMessagesTest extends AbstractTestMessages
{
public void testMessageStoreCreated()
{
- String name = "DerbyMessageStore";
-
- _logMessage = MessageStoreMessages.CREATED(name);
+ _logMessage = MessageStoreMessages.CREATED();
List<Object> log = performLog();
- String[] expected = {"Created :", name};
+ String[] expected = {"Created"};
validateLogMessage(log, "MST-1001", expected);
}
@@ -70,56 +68,4 @@ public class MessageStoreMessagesTest extends AbstractTestMessages
validateLogMessage(log, "MST-1004", expected);
}
-/*
- public void testMessageStoreRecoveryStart_withQueue()
- {
- String queueName = "testQueue";
-
- _logMessage = MessageStoreMessages.RECOVERY_START(queueName, true);
- List<Object> log = performLog();
-
- String[] expected = {"Recovery Start :", queueName};
-
- validateLogMessage(log, "MST-1004", expected);
- }
-
- public void testMessageStoreRecovered()
- {
- String queueName = "testQueue";
- Integer messasgeCount = 2000;
-
- _logMessage = MessageStoreMessages.MST_RECOVERED(messasgeCount, queueName);
- List<Object> log = performLog();
-
- // Here we use MessageFormat to ensure the messasgeCount of 2000 is
- // reformated for display as '2,000'
- String[] expected = {"Recovered ",
- MessageFormat.format("{0,number}", messasgeCount),
- "messages for queue", queueName};
-
- validateLogMessage(log, "MST-1005", expected);
- }
-
- public void testMessageStoreRecoveryComplete()
- {
- _logMessage = MessageStoreMessages.MST_RECOVERY_COMPLETE(null,false);
- List<Object> log = performLog();
-
- String[] expected = {"Recovery Complete"};
-
- validateLogMessage(log, "MST-1006", expected);
- }
-
- public void testMessageStoreRecoveryComplete_withQueue()
- {
- String queueName = "testQueue";
-
- _logMessage = MessageStoreMessages.MST_RECOVERY_COMPLETE(queueName, true);
- List<Object> log = performLog();
-
- String[] expected = {"Recovery Complete :", queueName};
-
- validateLogMessage(log, "MST-1006", expected);
- }
- */
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
index 158fb667a9..c62b24c3b9 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
@@ -37,13 +37,13 @@ public class MessageStoreLogSubjectTest extends AbstractTestLogSubject
_testVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().
getVirtualHost("test");
- _subject = new MessageStoreLogSubject(_testVhost, _testVhost.getMessageStore());
+ _subject = new MessageStoreLogSubject(_testVhost, _testVhost.getMessageStore().getClass().getSimpleName());
}
/**
* Validate that the logged Subject message is as expected:
* MESSAGE [Blank][vh(/test)/ms(MemoryMessageStore)] <Log Message>
- * @param message the message whos format needs validation
+ * @param message the message who's format needs validation
*/
@Override
protected void validateLogStatement(String message)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
index 71d5211470..fe9bcc57a6 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
@@ -30,7 +30,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -45,7 +45,7 @@ public class AMQProtocolSessionMBeanTest extends InternalBrokerBaseCase
/** Used for debugging. */
private static final Logger log = Logger.getLogger(AMQProtocolSessionMBeanTest.class);
- private MessageStore _messageStore = new SkeletonMessageStore();
+ private MessageStore _messageStore = new TestableMemoryMessageStore();
private AMQProtocolEngine _protocolSession;
private AMQChannel _channel;
private AMQProtocolSessionMBean _mbean;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index 337ff194c3..2e3ff90df9 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory;
import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -53,7 +54,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
XMLConfiguration configXml = new XMLConfiguration();
configXml.addProperty("virtualhosts.virtualhost(-1).name", getName());
- configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.class", TestableMemoryMessageStore.class.getName());
+ configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.factoryclass", TestableMemoryMessageStoreFactory.class.getName());
ServerConfiguration configuration = new ServerConfiguration(configXml);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 45933e7064..d588cdd42c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -157,7 +157,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
private void verifyBrokerState()
{
- TestableMemoryMessageStore store = (TestableMemoryMessageStore) getVirtualHost().getMessageStore();
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore) getVirtualHost().getMessageStore().getUnderlyingStore();
// Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 273f0dc018..409b9fd92e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
@@ -37,7 +36,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -53,13 +52,11 @@ import java.util.Set;
*/
public class AckTest extends InternalBrokerBaseCase
{
- private static final Logger _log = Logger.getLogger(AckTest.class);
-
private Subscription _subscription;
private AMQProtocolSession _protocolSession;
- private TestMemoryMessageStore _messageStore;
+ private TestableMemoryMessageStore _messageStore;
private AMQChannel _channel;
@@ -73,7 +70,7 @@ public class AckTest extends InternalBrokerBaseCase
{
super.setUp();
_virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
- _messageStore = new TestMemoryMessageStore();
+ _messageStore = new TestableMemoryMessageStore();
_protocolSession = new InternalTestProtocolSession(_virtualHost);
_channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/);
@@ -180,7 +177,7 @@ public class AckTest extends InternalBrokerBaseCase
}
catch (InterruptedException e)
{
- e.printStackTrace(); //TODO.
+ Thread.currentThread().interrupt();
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index a8676bf4c2..7c3098298e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -42,6 +42,7 @@ import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -105,9 +106,9 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance();
PropertiesConfiguration env = new PropertiesConfiguration();
- VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(getClass().getName(), env);
- vHostConfig.setMessageStoreClass(TestableMemoryMessageStore.class.getName());
- _virtualHost = new VirtualHostImpl(ApplicationRegistry.getInstance(), vHostConfig);
+ final VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration(getClass().getName(), env);
+ vhostConfig.setMessageStoreFactoryClass(TestableMemoryMessageStoreFactory.class.getName());
+ _virtualHost = new VirtualHostImpl(ApplicationRegistry.getInstance(), vhostConfig);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
_queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, false, _virtualHost, _arguments);
@@ -635,7 +636,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
qs.add(_queue);
MessageMetaData metaData = msg.headersReceived(System.currentTimeMillis());
- TestableMemoryMessageStore store = (TestableMemoryMessageStore) _virtualHost.getMessageStore();
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore) _virtualHost.getMessageStore().getUnderlyingStore();
StoredMessage handle = store.addMessage(metaData);
msg.setStoredMessage(handle);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/EventManagerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/EventManagerTest.java
new file mode 100644
index 0000000000..2be79c5839
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/EventManagerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.apache.qpid.server.store.Event.AFTER_ACTIVATE;
+import static org.apache.qpid.server.store.Event.BEFORE_ACTIVATE;
+import junit.framework.TestCase;
+
+public class EventManagerTest extends TestCase
+{
+ private EventManager _eventManager = new EventManager();
+ private EventListener _mockListener = mock(EventListener.class);
+
+ public void testEventListenerFires()
+ {
+ _eventManager.addEventListener(_mockListener, BEFORE_ACTIVATE);
+ _eventManager.notifyEvent(BEFORE_ACTIVATE);
+ verify(_mockListener).event(BEFORE_ACTIVATE);
+ }
+
+ public void testEventListenerDoesntFire()
+ {
+ _eventManager.addEventListener(_mockListener, BEFORE_ACTIVATE);
+ _eventManager.notifyEvent(AFTER_ACTIVATE);
+ verifyZeroInteractions(_mockListener);
+ }
+
+ public void testEventListenerFiresMulitpleTimes()
+ {
+ _eventManager.addEventListener(_mockListener, BEFORE_ACTIVATE);
+ _eventManager.addEventListener(_mockListener, AFTER_ACTIVATE);
+
+ _eventManager.notifyEvent(BEFORE_ACTIVATE);
+ verify(_mockListener).event(BEFORE_ACTIVATE);
+
+ _eventManager.notifyEvent(AFTER_ACTIVATE);
+ verify(_mockListener).event(AFTER_ACTIVATE);
+ }
+
+ public void testMultipleListenersFireForSameEvent()
+ {
+ final EventListener mockListener1 = mock(EventListener.class);
+ final EventListener mockListener2 = mock(EventListener.class);
+
+ _eventManager.addEventListener(mockListener1, BEFORE_ACTIVATE);
+ _eventManager.addEventListener(mockListener2, BEFORE_ACTIVATE);
+ _eventManager.notifyEvent(BEFORE_ACTIVATE);
+
+ verify(mockListener1).event(BEFORE_ACTIVATE);
+ verify(mockListener2).event(BEFORE_ACTIVATE);
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 755d61a260..c589bd108b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -101,7 +101,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
String storePath = System.getProperty("QPID_WORK") + "/" + getName();
_config = new PropertiesConfiguration();
- _config.addProperty("store.class", getTestProfileMessageStoreClassName());
+ _config.addProperty("store.factoryclass", getTestProfileMessageStoreFactoryClassName());
_config.addProperty("store.environment-path", storePath);
cleanup(new File(storePath));
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
index 2ffa157ca8..4aa023a25c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
@@ -34,12 +34,12 @@ import org.apache.qpid.test.utils.QpidTestCase;
*/
public class ReferenceCountingTest extends QpidTestCase
{
- private TestMemoryMessageStore _store;
+ private TestableMemoryMessageStore _store;
protected void setUp() throws Exception
{
- _store = new TestMemoryMessageStore();
+ _store = new TestableMemoryMessageStore();
}
/**
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
deleted file mode 100644
index 38d3fb78fc..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import org.apache.commons.configuration.Configuration;
-
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-
-/**
- * A message store that does nothing. Designed to be used in tests that do not want to use any message store
- * functionality.
- */
-public class SkeletonMessageStore implements MessageStore
-{
- public void configureConfigStore(String name,
- ConfigurationRecoveryHandler recoveryHandler,
- Configuration config,
- LogSubject logSubject) throws Exception
- {
- }
-
- public void configureMessageStore(String name,
- MessageStoreRecoveryHandler recoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler,
- Configuration config, LogSubject logSubject) throws Exception
- {
- }
-
- public void close() throws Exception
- {
- }
-
- public <M extends StorableMessageMetaData> StoredMessage<M> addMessage(M metaData)
- {
- return null;
- }
-
-
- public void createExchange(Exchange exchange) throws AMQStoreException
- {
-
- }
-
- public void removeExchange(Exchange exchange) throws AMQStoreException
- {
-
- }
-
- public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
- {
-
- }
-
- public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
- {
-
- }
-
- public void createQueue(AMQQueue queue) throws AMQStoreException
- {
- }
-
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
- {
- }
-
- public boolean isPersistent()
- {
- return false;
- }
-
- public void removeQueue(final AMQQueue queue) throws AMQStoreException
- {
-
- }
-
- public Transaction newTransaction()
- {
- return new Transaction()
- {
-
- public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
- {
-
- }
-
- public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
- {
-
- }
-
- public void commitTran() throws AMQStoreException
- {
-
- }
-
- public StoreFuture commitTranAsync() throws AMQStoreException
- {
- return new StoreFuture()
- {
- public boolean isComplete()
- {
- return true;
- }
-
- public void waitForCompletion()
- {
-
- }
- };
- }
-
- public void abortTran() throws AMQStoreException
- {
-
- }
-
- public void removeXid(long format, byte[] globalId, byte[] branchId)
- {
- }
-
- public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
- {
- }
- };
- }
-
- public void updateQueue(AMQQueue queue) throws AMQStoreException
- {
-
- }
-
- @Override
- public void createBrokerLink(BrokerLink link) throws AMQStoreException
- {
- }
-
- @Override
- public void deleteBrokerLink(BrokerLink link) throws AMQStoreException
- {
- }
-
- @Override
- public void createBridge(Bridge bridge) throws AMQStoreException
- {
- }
-
- @Override
- public void deleteBridge(Bridge bridge) throws AMQStoreException
- {
- }
-
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
new file mode 100644
index 0000000000..b09dcbbdf3
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+
+import junit.framework.TestCase;
+
+public class StateManagerTest extends TestCase
+{
+
+ private StateManager _manager = new StateManager();
+
+ public void testInitialState()
+ {
+ assertEquals(State.INITIAL, _manager.getState());
+ }
+
+ public void testStateTransitionAllowed()
+ {
+ assertEquals(State.INITIAL, _manager.getState());
+
+ _manager.stateTransition(State.INITIAL, State.ACTIVE);
+ assertEquals(State.ACTIVE, _manager.getState());
+ }
+
+ public void testStateTransitionDisallowed()
+ {
+ assertEquals(State.INITIAL, _manager.getState());
+
+ try
+ {
+ _manager.stateTransition(State.ACTIVE, State.CLOSING);
+ fail("Exception not thrown");
+ }
+ catch (IllegalStateException e)
+ {
+ // PASS
+ }
+ assertEquals(State.INITIAL, _manager.getState());
+ }
+
+ public void testIsInState()
+ {
+ assertEquals(State.INITIAL, _manager.getState());
+ assertFalse(_manager.isInState(State.ACTIVE));
+ assertTrue(_manager.isInState(State.INITIAL));
+ }
+
+ public void testIsNotInState()
+ {
+ assertEquals(State.INITIAL, _manager.getState());
+ assertTrue(_manager.isNotInState(State.ACTIVE));
+ assertFalse(_manager.isNotInState(State.INITIAL));
+ }
+
+ public void testCheckInState()
+ {
+ assertEquals(State.INITIAL, _manager.getState());
+
+ try
+ {
+ _manager.checkInState(State.ACTIVE);
+ fail("Exception not thrown");
+ }
+ catch (IllegalStateException e)
+ {
+ // PASS
+ }
+ assertEquals(State.INITIAL, _manager.getState());
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
deleted file mode 100644
index 8a261b3b86..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Adds some extra methods to the memory message store for testing purposes.
- */
-public class TestMemoryMessageStore extends MemoryMessageStore
-{
- private AtomicInteger _messageCount = new AtomicInteger(0);
-
-
- public TestMemoryMessageStore()
- {
- }
-
- @Override
- public StoredMessage addMessage(StorableMessageMetaData metaData)
- {
- return new TestableStoredMessage(super.addMessage(metaData));
- }
-
- public int getMessageCount()
- {
- return _messageCount.get();
- }
-
- private class TestableStoredMessage implements StoredMessage
- {
- private final StoredMessage _storedMessage;
-
- public TestableStoredMessage(StoredMessage storedMessage)
- {
- _messageCount.incrementAndGet();
- _storedMessage = storedMessage;
- }
-
- public StorableMessageMetaData getMetaData()
- {
- return _storedMessage.getMetaData();
- }
-
- public long getMessageNumber()
- {
- return _storedMessage.getMessageNumber();
- }
-
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- _storedMessage.addContent(offsetInMessage, src);
- }
-
- public int getContent(int offsetInMessage, ByteBuffer dst)
- {
- return _storedMessage.getContent(offsetInMessage, dst);
- }
-
-
- public ByteBuffer getContent(int offsetInMessage, int size)
- {
- return _storedMessage.getContent(offsetInMessage, size);
- }
-
- public StoreFuture flushToStore()
- {
- return _storedMessage.flushToStore();
- }
-
- public void remove()
- {
- _storedMessage.remove();
- _messageCount.decrementAndGet();
- }
-
- }
-
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
index 104e06d29a..210408f490 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -33,26 +34,8 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class TestableMemoryMessageStore extends MemoryMessageStore
{
-
- private MemoryMessageStore _mms = null;
- private HashMap<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>();
- private AtomicInteger _messageCount = new AtomicInteger(0);
-
- public TestableMemoryMessageStore(MemoryMessageStore mms)
- {
- _mms = mms;
- }
-
- public TestableMemoryMessageStore()
- {
-
- }
-
- @Override
- public void close() throws Exception
- {
- // Not required to do anything
- }
+ private final Map<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>();
+ private final AtomicInteger _messageCount = new AtomicInteger(0);
@Override
public StoredMessage addMessage(StorableMessageMetaData metaData)
@@ -65,36 +48,34 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
return _messageCount.get();
}
+ public Map<Long, AMQQueue> getMessages()
+ {
+ return _messages;
+ }
+
private class TestableTransaction implements Transaction
{
+ @Override
public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
getMessages().put(message.getMessageNumber(), (AMQQueue)queue);
}
+ @Override
public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
getMessages().remove(message.getMessageNumber());
}
+ @Override
public void commitTran() throws AMQStoreException
{
}
+ @Override
public StoreFuture commitTranAsync() throws AMQStoreException
{
- return new StoreFuture()
- {
- public boolean isComplete()
- {
- return true;
- }
-
- public void waitForCompletion()
- {
-
- }
- };
+ return StoreFuture.IMMEDIATE_FUTURE;
}
public void abortTran() throws AMQStoreException
@@ -117,10 +98,6 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
return new TestableTransaction();
}
- public HashMap<Long, AMQQueue> getMessages()
- {
- return _messages;
- }
private class TestableStoredMessage implements StoredMessage
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
new file mode 100644
index 0000000000..a737836ed5
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.store.decorators.EventDecorator;
+import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator;
+
+public class TestableMemoryMessageStoreFactory implements MessageStoreFactory
+{
+
+ @Override
+ public MessageStore createMessageStore(LogSubject logSubject)
+ {
+ return new OperationalLoggingDecorator(new EventDecorator(new TestableMemoryMessageStore()), logSubject);
+ }
+
+ @Override
+ public String getStoreClassName()
+ {
+ return TestableMemoryMessageStore.class.getSimpleName();
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/decorators/EventDecoratorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/decorators/EventDecoratorTest.java
new file mode 100644
index 0000000000..7038b8710b
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/decorators/EventDecoratorTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.decorators;
+
+import static org.mockito.Mockito.*;
+
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.decorators.EventDecorator;
+import org.mockito.InOrder;
+
+import junit.framework.TestCase;
+
+public class EventDecoratorTest extends TestCase
+{
+ private MessageStore _mockStore = mock(MessageStore.class);
+ private EventListener _mockListener = mock(EventListener.class);
+
+ private EventDecorator _eventDecorator = new EventDecorator(_mockStore);
+ private InOrder _orderMock = inOrder(_mockListener, _mockStore);
+
+ public void testBeforeActivateDecoration() throws Exception
+ {
+ _eventDecorator.addEventListener(_mockListener, Event.BEFORE_ACTIVATE);
+ _eventDecorator.activate();
+
+ _orderMock.verify(_mockListener).event(Event.BEFORE_ACTIVATE);
+ _orderMock.verify(_mockStore).activate();
+ }
+
+ public void testAfterActivateDecoration() throws Exception
+ {
+ _eventDecorator.addEventListener(_mockListener, Event.AFTER_ACTIVATE);
+ _eventDecorator.activate();
+
+ _orderMock.verify(_mockStore).activate();
+ _orderMock.verify(_mockListener).event(Event.AFTER_ACTIVATE);
+ }
+
+ public void testBeforeAfterActivateDecoration() throws Exception
+ {
+ _eventDecorator.addEventListener(_mockListener, Event.BEFORE_ACTIVATE);
+ _eventDecorator.addEventListener(_mockListener, Event.AFTER_ACTIVATE);
+ _eventDecorator.activate();
+
+ _orderMock.verify(_mockListener).event(Event.BEFORE_ACTIVATE);
+ _orderMock.verify(_mockStore).activate();
+ _orderMock.verify(_mockListener).event(Event.AFTER_ACTIVATE);
+ }
+
+ public void testBeforeAfterCloseDecoration() throws Exception
+ {
+ _eventDecorator.addEventListener(_mockListener, Event.BEFORE_CLOSE);
+ _eventDecorator.addEventListener(_mockListener, Event.AFTER_CLOSE);
+ _eventDecorator.close();
+
+ _orderMock.verify(_mockListener).event(Event.BEFORE_CLOSE);
+ _orderMock.verify(_mockStore).close();
+ _orderMock.verify(_mockListener).event(Event.AFTER_CLOSE);
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecoratorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecoratorTest.java
new file mode 100644
index 0000000000..cf06d3ab72
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecoratorTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.decorators;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import static org.apache.qpid.server.store.MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator;
+import org.hamcrest.Description;
+import org.mockito.ArgumentMatcher;
+import org.mockito.InOrder;
+
+public class OperationalLoggingDecoratorTest extends TestCase
+{
+ private MessageStore _messageStore = mock(MessageStore.class);
+ private LogActor _mockActor = mock(LogActor.class);
+ private LogSubject _mockLogSubject = mock(LogSubject.class);
+ private OperationalLoggingDecorator _operationalLoggingDecorator = new OperationalLoggingDecorator(_messageStore, _mockLogSubject);
+ private InOrder _inOrder = inOrder(_mockActor, _messageStore);
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ CurrentActor.set(_mockActor);
+ }
+
+ public void testConfigureMessageStore() throws Exception
+ {
+ _operationalLoggingDecorator.configureMessageStore(null,null,null,null);
+
+ _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1001 : Created"));
+ _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("TXN-1001 : Created"));
+ _inOrder.verify(_messageStore).configureMessageStore(anyString(), any(MessageStoreRecoveryHandler.class), any(TransactionLogRecoveryHandler.class), any(Configuration.class));
+ }
+
+ public void testConfigureMessageStoreWithStoreLocation() throws Exception
+ {
+ final String storeLocation = "/my/store/location";
+ Configuration mockConfig = mock(Configuration.class);
+ when(mockConfig.getString(ENVIRONMENT_PATH_PROPERTY)).thenReturn(storeLocation);
+
+ _operationalLoggingDecorator.configureMessageStore(null,null,null, mockConfig);
+
+ _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1001 : Created"));
+ _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("TXN-1001 : Created"));
+ _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1002 : Store location : " + storeLocation));
+ _inOrder.verify(_messageStore).configureMessageStore(anyString(), any(MessageStoreRecoveryHandler.class), any(TransactionLogRecoveryHandler.class), any(Configuration.class));
+ }
+
+ public void testConfigureConfigStore() throws Exception
+ {
+ _operationalLoggingDecorator.configureConfigStore(null,null,null);
+
+ _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("CFG-1001 : Created"));
+ _inOrder.verify(_messageStore).configureConfigStore(anyString(), any(ConfigurationRecoveryHandler.class), any(Configuration.class));
+ }
+
+ public void testActivate() throws Exception
+ {
+ _operationalLoggingDecorator.activate();
+
+ _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1004 : Recovery Start"));
+ _inOrder.verify(_messageStore).activate();
+ _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1006 : Recovery Complete"));
+ }
+
+ public void testClose() throws Exception
+ {
+ _operationalLoggingDecorator.close();
+
+ _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1003 : Closed"));
+ _inOrder.verify(_messageStore).close();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ CurrentActor.remove();
+ }
+
+ private LogMessage matchesLogMessage(String expectedLogMessage)
+ {
+ return argThat(new LogMessageArgumentMatcher(expectedLogMessage));
+ }
+
+ private final class LogMessageArgumentMatcher extends ArgumentMatcher<LogMessage>
+ {
+ private final String _expectedText;
+ private String _description = null;
+;
+ public LogMessageArgumentMatcher(String _expectedLogMessage)
+ {
+ this._expectedText = _expectedLogMessage;
+ }
+
+ @Override
+ public boolean matches(Object item)
+ {
+ LogMessage logMessage = (LogMessage) item;
+ final String actualText = logMessage.toString();
+ if (actualText.equals(_expectedText))
+ {
+ return true;
+ }
+ else
+ {
+ _description = "Expected <" + _expectedText + "> but got <" + actualText + ">";
+ return false;
+ }
+ }
+
+ @Override
+ public void describeTo(Description description)
+ {
+ if (description != null)
+ {
+ description.appendText(" : "+ _description);
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
index e9b7ceacc5..af49238998 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
@@ -20,26 +20,13 @@
*/
package org.apache.qpid.server.txn;
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.NotImplementedException;
-
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.NullMessageStore;
import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
/**
@@ -129,111 +116,14 @@ class MockStoreTransaction implements Transaction
public static MessageStore createTestTransactionLog(final MockStoreTransaction storeTransaction)
{
- return new MessageStore()
+ return new NullMessageStore()
{
- public void configureMessageStore(final String name,
- final MessageStoreRecoveryHandler recoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler,
- final Configuration config, final LogSubject logSubject) throws Exception
- {
- }
-
- public void close() throws Exception
- {
- }
-
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData)
- {
- return null;
- }
-
- public boolean isPersistent()
- {
- return false;
- }
-
+ @Override
public Transaction newTransaction()
{
storeTransaction.setState(TransactionState.STARTED);
return storeTransaction;
}
-
- @Override
- public void configureConfigStore(String name,
- ConfigurationRecoveryHandler recoveryHandler,
- Configuration config, LogSubject logSubject)
- throws Exception
- {
- }
-
- @Override
- public void createExchange(Exchange exchange)
- throws AMQStoreException
- {
- }
-
- @Override
- public void removeExchange(Exchange exchange)
- throws AMQStoreException
- {
- }
-
- @Override
- public void bindQueue(Exchange exchange, AMQShortString routingKey,
- AMQQueue queue, FieldTable args) throws AMQStoreException
- {
- }
-
- @Override
- public void unbindQueue(Exchange exchange,
- AMQShortString routingKey, AMQQueue queue, FieldTable args)
- throws AMQStoreException
- {
- }
-
- @Override
- public void createQueue(AMQQueue queue) throws AMQStoreException
- {
- }
-
- @Override
- public void createQueue(AMQQueue queue, FieldTable arguments)
- throws AMQStoreException
- {
- }
-
- @Override
- public void removeQueue(AMQQueue queue) throws AMQStoreException
- {
- }
-
- @Override
- public void updateQueue(AMQQueue queue) throws AMQStoreException
- {
- }
-
- @Override
- public void createBrokerLink(BrokerLink link)
- throws AMQStoreException
- {
- }
-
- @Override
- public void deleteBrokerLink(BrokerLink link)
- throws AMQStoreException
- {
- }
-
- @Override
- public void createBridge(Bridge bridge) throws AMQStoreException
- {
- }
-
- @Override
- public void deleteBridge(Bridge bridge) throws AMQStoreException
- {
- }
-
- };
+ };
}
} \ No newline at end of file
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index 9df0aec545..6b48d55fae 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -43,6 +43,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -65,10 +66,10 @@ public class InternalBrokerBaseCase extends QpidTestCase
super.setUp();
_configXml.addProperty("virtualhosts.virtualhost.name", "test");
- _configXml.addProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
+ _configXml.addProperty("virtualhosts.virtualhost.test.store.factoryclass", TestableMemoryMessageStoreFactory.class.getName());
_configXml.addProperty("virtualhosts.virtualhost(-1).name", getName());
- _configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.class", TestableMemoryMessageStore.class.getName());
+ _configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.factoryclass", TestableMemoryMessageStoreFactory.class.getName());
createBroker();
}
@@ -97,7 +98,7 @@ public class InternalBrokerBaseCase extends QpidTestCase
_virtualHost.getBindingFactory().addBinding(QUEUE_NAME.toString(), _queue, defaultExchange, null);
_virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test");
- _messageStore = _virtualHost.getMessageStore();
+ _messageStore = _virtualHost.getMessageStore().getUnderlyingStore();
_queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(getName()), false, new AMQShortString("testowner"),
false, false, _virtualHost, null);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index cccf02c9f3..f27dc33dc3 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -275,4 +275,10 @@ public class MockVirtualHost implements VirtualHost
{
}
+
+ @Override
+ public State getState()
+ {
+ return State.ACTIVE;
+ }
} \ No newline at end of file
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java
index df7b4da426..87eb0f9d16 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java
@@ -27,7 +27,7 @@ import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStoreFactory;
import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -68,31 +68,6 @@ public class VirtualHostImplTest extends QpidTestCase
customBindingTestImpl(new String[0]);
}
- private void customBindingTestImpl(final String[] routingKeys) throws Exception
- {
- String exchangeName = getName() +".direct";
- String vhostName = getName();
- String queueName = getName();
-
- File config = writeConfigFile(vhostName, queueName, exchangeName, false, routingKeys);
- VirtualHost vhost = createVirtualHost(vhostName, config);
- assertNotNull("virtualhost should exist", vhost);
-
- AMQQueue queue = vhost.getQueueRegistry().getQueue(queueName);
- assertNotNull("queue should exist", queue);
-
- Exchange defaultExch = vhost.getExchangeRegistry().getDefaultExchange();
- assertTrue("queue should have been bound to default exchange with its name", defaultExch.isBound(queueName, queue));
-
- Exchange exch = vhost.getExchangeRegistry().getExchange(exchangeName);
- assertTrue("queue should have been bound to " + exchangeName + " with its name", exch.isBound(queueName, queue));
-
- for(String key: routingKeys)
- {
- assertTrue("queue should have been bound to " + exchangeName + " with key " + key, exch.isBound(key, queue));
- }
- }
-
/**
* Tests that specifying custom routing keys for a queue in the configuration file results in failure
* to create the vhost (since this is illegal, only queue names are used with the default exchange)
@@ -106,12 +81,32 @@ public class VirtualHostImplTest extends QpidTestCase
createVirtualHost(getName(), config);
fail("virtualhost creation should have failed due to illegal configuration");
}
- catch (ConfigurationException e)
+ catch (RuntimeException e)
{
+ assertEquals(ConfigurationException.class, e.getCause().getClass());
//expected
}
}
+ public void testVirtualHostBecomesActive() throws Exception
+ {
+ File config = writeConfigFile(getName(), getName(), getName() +".direct", false, new String[0]);
+ VirtualHost vhost = createVirtualHost(getName(), config);
+ assertNotNull(vhost);
+ assertEquals(State.ACTIVE, vhost.getState());
+ }
+
+ public void testVirtualHostBecomesStoppedOnClose() throws Exception
+ {
+ File config = writeConfigFile(getName(), getName(), getName() +".direct", false, new String[0]);
+ VirtualHost vhost = createVirtualHost(getName(), config);
+ assertNotNull(vhost);
+ assertEquals(State.ACTIVE, vhost.getState());
+ vhost.close();
+ assertEquals(State.STOPPED, vhost.getState());
+ assertEquals(0, vhost.getHouseKeepingActiveCount());
+ }
+
/**
* Tests that specifying an unknown exchange to bind the queue to results in failure to create the vhost
*/
@@ -124,12 +119,39 @@ public class VirtualHostImplTest extends QpidTestCase
createVirtualHost(getName(), config);
fail("virtualhost creation should have failed due to illegal configuration");
}
- catch (ConfigurationException e)
+ catch (RuntimeException e)
{
+ assertEquals(ConfigurationException.class, e.getCause().getClass());
//expected
}
}
+ private void customBindingTestImpl(final String[] routingKeys) throws Exception
+ {
+ String exchangeName = getName() +".direct";
+ String vhostName = getName();
+ String queueName = getName();
+
+ File config = writeConfigFile(vhostName, queueName, exchangeName, false, routingKeys);
+ VirtualHost vhost = createVirtualHost(vhostName, config);
+ assertNotNull("virtualhost should exist", vhost);
+
+ AMQQueue queue = vhost.getQueueRegistry().getQueue(queueName);
+ assertNotNull("queue should exist", queue);
+
+ Exchange defaultExch = vhost.getExchangeRegistry().getDefaultExchange();
+ assertTrue("queue should have been bound to default exchange with its name", defaultExch.isBound(queueName, queue));
+
+ Exchange exch = vhost.getExchangeRegistry().getExchange(exchangeName);
+ assertTrue("queue should have been bound to " + exchangeName + " with its name", exch.isBound(queueName, queue));
+
+ for(String key: routingKeys)
+ {
+ assertTrue("queue should have been bound to " + exchangeName + " with key " + key, exch.isBound(key, queue));
+ }
+ }
+
+
private VirtualHost createVirtualHost(String vhostName, File config) throws Exception
{
_configuration = new ServerConfiguration(new XMLConfiguration(config));
@@ -167,11 +189,11 @@ public class VirtualHostImplTest extends QpidTestCase
writer.write("<virtualhosts>");
writer.write(" <default>" + vhostName + "</default>");
writer.write(" <virtualhost>");
- writer.write(" <store>");
- writer.write(" <class>" + TestableMemoryMessageStore.class.getName() + "</class>");
- writer.write(" </store>");
writer.write(" <name>" + vhostName + "</name>");
writer.write(" <" + vhostName + ">");
+ writer.write(" <store>");
+ writer.write(" <factoryclass>" + MemoryMessageStoreFactory.class.getName() + "</factoryclass>");
+ writer.write(" </store>");
if(exchangeName != null && !dontDeclare)
{
writer.write(" <exchanges>");