summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-05-17 17:26:04 +0000
committerRobert Gemmell <robbie@apache.org>2012-05-17 17:26:04 +0000
commitf5d67044a9797c397764a7ac1aa1a1ed4aa893a3 (patch)
treecf8a9cf6a5f741e31417ca4d32a6b708bb3b9fdd /qpid/java/broker/src
parentf523b9e510fc90ce3f7f7d7c2960f3bfee3d42df (diff)
downloadqpid-python-f5d67044a9797c397764a7ac1aa1a1ed4aa893a3.tar.gz
QPID-4006: add support for using BDB HA to form an active-passive cluster for persistent messaging
- Includes support for setting BDB configuration parameters via the store configuration, both for the existing store and the new HA variant. - Removes the MessageStoreFactory and reverts store configuration to historical values. Applied patch from Keith Wall, Andrew MacBean <andymacbean@gmail.com>, Oleksandr Rudyy <orudyy@gmail.com>, Philip Harvey <phil@philharveyonline.com>, and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1339728 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/HAMessageStore.java (renamed from qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java)10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java37
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java27
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java33
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java40
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java90
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java13
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java62
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java37
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java4
22 files changed, 168 insertions, 257 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 5f472b6ddd..59c6926b76 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -32,7 +32,6 @@ import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.MemoryMessageStoreFactory;
import java.util.ArrayList;
import java.util.Arrays;
@@ -103,14 +102,14 @@ public class VirtualHostConfiguration extends ConfigurationPlugin
return getConfig().subset("store");
}
- public String getMessageStoreFactoryClass()
+ public String getMessageStoreClass()
{
- return getStringValue("store.factoryclass", MemoryMessageStoreFactory.class.getName());
+ return getStringValue("store.class", MemoryMessageStore.class.getName());
}
- public void setMessageStoreFactoryClass(String storeFactoryClass)
+ public void setMessageStoreClass(String storeFactoryClass)
{
- getConfig().setProperty("store.factoryclass", storeFactoryClass);
+ getConfig().setProperty("store.class", storeFactoryClass);
}
public List getExchanges()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java
index 9b5ceef35f..c681126c11 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java
@@ -23,12 +23,20 @@ public enum Event
{
BEFORE_INIT,
AFTER_INIT,
+
BEFORE_ACTIVATE,
AFTER_ACTIVATE,
+
BEFORE_PASSIVATE,
AFTER_PASSIVATE,
+
BEFORE_CLOSE,
AFTER_CLOSE,
+
+ BEFORE_QUIESCE,
+ AFTER_QUIESCE,
+ BEFORE_RESTART,
+
PERSISTENT_MESSAGE_SIZE_OVERFULL,
- PERSISTENT_MESSAGE_SIZE_UNDERFULL,
+ PERSISTENT_MESSAGE_SIZE_UNDERFULL
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java
index 21ae3924b8..bf3de2611d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java
@@ -24,9 +24,12 @@ import java.util.EnumMap;
import java.util.List;
import java.util.Map;
+import org.apache.log4j.Logger;
+
public class EventManager
{
private Map<Event, List<EventListener>> _listeners = new EnumMap<Event, List<EventListener>> (Event.class);
+ private static final Logger _LOGGER = Logger.getLogger(EventManager.class);
public synchronized void addEventListener(EventListener listener, Event... events)
{
@@ -46,6 +49,11 @@ public class EventManager
{
if (_listeners.containsKey(event))
{
+ if(_LOGGER.isDebugEnabled())
+ {
+ _LOGGER.debug("Received event " + event);
+ }
+
for (EventListener listener : _listeners.get(event))
{
listener.event(event);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/HAMessageStore.java
index a35db62b03..59483751ca 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/HAMessageStore.java
@@ -19,9 +19,11 @@
*/
package org.apache.qpid.server.store;
-public interface MessageStoreFactory
+public interface HAMessageStore extends MessageStore
{
- MessageStore createMessageStore();
-
- String getStoreClassName();
+ /**
+ * Used to indicate that a store requires to make itself unavailable for read and read/write
+ * operations.
+ */
+ void passivate();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 59624b7a75..7b98b30860 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -83,19 +83,19 @@ public class MemoryMessageStore extends NullMessageStore
@Override
public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception
{
- _stateManager.attainState(State.CONFIGURING);
+ _stateManager.attainState(State.INITIALISING);
}
@Override
public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception
{
- _stateManager.attainState(State.CONFIGURED);
+ _stateManager.attainState(State.INITIALISED);
}
@Override
public void activate() throws Exception
{
- _stateManager.attainState(State.RECOVERING);
+ _stateManager.attainState(State.ACTIVATING);
_stateManager.attainState(State.ACTIVE);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java
deleted file mode 100644
index 8724f102c6..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-public class MemoryMessageStoreFactory implements MessageStoreFactory
-{
-
- @Override
- public MessageStore createMessageStore()
- {
- return new MemoryMessageStore();
- }
-
- @Override
- public String getStoreClassName()
- {
- return MemoryMessageStore.class.getSimpleName();
- }
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java
index 7cbdede85e..2783637b2a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java
@@ -20,19 +20,30 @@
*/
package org.apache.qpid.server.store;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+
public enum State
{
-
+ /** The initial state of the store. In practice, the store immediately transitions to the subsequent states. */
INITIAL,
- CONFIGURING,
- CONFIGURED,
- RECOVERING,
+
+ INITIALISING,
+ /**
+ * The initial set-up of the store has completed.
+ * If the store is persistent, it has not yet loaded configuration for {@link ConfiguredObject}'s from disk.
+ *
+ * From the point of view of the user, the store is essentially stopped.
+ */
+ INITIALISED,
+
+ ACTIVATING,
ACTIVE,
- QUIESCING,
- QUIESCED,
+
CLOSING,
- CLOSED;
-
+ CLOSED,
+ QUIESCING,
+ /** The virtual host (and implicitly also the store) has been manually paused by the user to allow configuration changes to take place */
+ QUIESCED;
} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java
index 5998be5bb6..613b329beb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java
@@ -24,6 +24,8 @@ package org.apache.qpid.server.store;
import java.util.EnumMap;
import java.util.Map;
+import org.apache.qpid.server.store.StateManager.Transition;
+
public class StateManager
{
private State _state = State.INITIAL;
@@ -70,16 +72,23 @@ public class StateManager
}
- public static final Transition CONFIGURE = new Transition(State.INITIAL, State.CONFIGURING, Event.BEFORE_INIT);
- public static final Transition CONFIGURE_COMPLETE = new Transition(State.CONFIGURING, State.CONFIGURED, Event.AFTER_INIT);
- public static final Transition RECOVER = new Transition(State.CONFIGURED, State.RECOVERING, Event.BEFORE_ACTIVATE);
- public static final Transition ACTIVATE = new Transition(State.RECOVERING, State.ACTIVE, Event.AFTER_ACTIVATE);
+ public static final Transition INITIALISE = new Transition(State.INITIAL, State.INITIALISING, Event.BEFORE_INIT);
+ public static final Transition INITALISE_COMPLETE = new Transition(State.INITIALISING, State.INITIALISED, Event.AFTER_INIT);
+
+ public static final Transition ACTIVATE = new Transition(State.INITIALISED, State.ACTIVATING, Event.BEFORE_ACTIVATE);
+ public static final Transition ACTIVATE_COMPLETE = new Transition(State.ACTIVATING, State.ACTIVE, Event.AFTER_ACTIVATE);
+
+ public static final Transition CLOSE_INITIALISED = new Transition(State.INITIALISED, State.CLOSING, Event.BEFORE_CLOSE);;
public static final Transition CLOSE_ACTIVE = new Transition(State.ACTIVE, State.CLOSING, Event.BEFORE_CLOSE);
public static final Transition CLOSE_QUIESCED = new Transition(State.QUIESCED, State.CLOSING, Event.BEFORE_CLOSE);
public static final Transition CLOSE_COMPLETE = new Transition(State.CLOSING, State.CLOSED, Event.AFTER_CLOSE);
- public static final Transition QUIESCE = new Transition(State.ACTIVE, State.QUIESCING, Event.BEFORE_PASSIVATE);
- public static final Transition QUIESCE_COMPLETE = new Transition(State.QUIESCING, State.QUIESCED, Event.BEFORE_PASSIVATE);
- public static final Transition RESTART = new Transition(State.QUIESCED, State.RECOVERING, Event.BEFORE_ACTIVATE);
+
+ public static final Transition PASSIVATE = new Transition(State.ACTIVE, State.INITIALISED, Event.BEFORE_PASSIVATE);
+
+ public static final Transition QUIESCE = new Transition(State.ACTIVE, State.QUIESCING, Event.BEFORE_QUIESCE);
+ public static final Transition QUIESCE_COMPLETE = new Transition(State.QUIESCING, State.QUIESCED, Event.AFTER_QUIESCE);
+
+ public static final Transition RESTART = new Transition(State.QUIESCED, State.ACTIVATING, Event.BEFORE_RESTART);
public StateManager(final EventManager eventManager)
@@ -105,16 +114,6 @@ public class StateManager
return _state;
}
- public synchronized void stateTransition(final State current, final State desired)
- {
- if (_state != current)
- {
- throw new IllegalStateException("Cannot transition to the state: " + desired + "; need to be in state: " + current
- + "; currently in state: " + _state);
- }
- attainState(desired);
- }
-
public synchronized void attainState(State desired)
{
Transition transition = null;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index de1ce1a9db..c065eb263b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -260,7 +260,7 @@ public class DerbyMessageStore implements MessageStore
ConfigurationRecoveryHandler configRecoveryHandler,
Configuration storeConfiguration) throws Exception
{
- _stateManager.attainState(State.CONFIGURING);
+ _stateManager.attainState(State.INITIALISING);
_configRecoveryHandler = configRecoveryHandler;
commonConfiguration(name, storeConfiguration);
@@ -276,13 +276,13 @@ public class DerbyMessageStore implements MessageStore
_tlogRecoveryHandler = tlogRecoveryHandler;
_messageRecoveryHandler = recoveryHandler;
- _stateManager.attainState(State.CONFIGURED);
+ _stateManager.attainState(State.INITIALISED);
}
@Override
public void activate() throws Exception
{
- _stateManager.attainState(State.RECOVERING);
+ _stateManager.attainState(State.ACTIVATING);
// this recovers durable exchanges, queues, and bindings
recoverConfiguration(_configRecoveryHandler);
@@ -716,7 +716,7 @@ public class DerbyMessageStore implements MessageStore
public void close() throws Exception
{
_closed.getAndSet(true);
- _stateManager.stateTransition(State.ACTIVE, State.CLOSING);
+ _stateManager.attainState(State.CLOSING);
try
{
@@ -737,7 +737,7 @@ public class DerbyMessageStore implements MessageStore
}
}
- _stateManager.stateTransition(State.CLOSING, State.CLOSED);
+ _stateManager.attainState(State.CLOSED);
}
@Override
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
deleted file mode 100644
index 12d7f64a8d..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.derby;
-
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreFactory;
-
-public class DerbyMessageStoreFactory implements MessageStoreFactory
-{
-
- @Override
- public MessageStore createMessageStore()
- {
- return new DerbyMessageStore();
- }
-
- @Override
- public String getStoreClassName()
- {
- return DerbyMessageStore.class.getSimpleName();
- }
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index b05025467d..5a14092930 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -59,8 +59,8 @@ import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.HAMessageStore;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreFactory;
import org.apache.qpid.server.store.OperationalLoggingListener;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin;
@@ -173,7 +173,7 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
- _messageStore = initialiseMessageStore(hostConfig.getMessageStoreFactoryClass());
+ _messageStore = initialiseMessageStore(hostConfig.getMessageStoreClass());
configureMessageStore(hostConfig);
@@ -329,20 +329,19 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
}
- private MessageStore initialiseMessageStore(final String messageStoreFactoryClass) throws Exception
+ private MessageStore initialiseMessageStore(final String messageStoreClass) throws Exception
{
- final Class<?> clazz = Class.forName(messageStoreFactoryClass);
+ final Class<?> clazz = Class.forName(messageStoreClass);
final Object o = clazz.newInstance();
- if (!(o instanceof MessageStoreFactory))
+ if (!(o instanceof MessageStore))
{
- throw new ClassCastException("Message store factory class must implement " + MessageStoreFactory.class +
+ throw new ClassCastException("Message store factory class must implement " + MessageStore.class +
". Class " + clazz + " does not.");
}
- final MessageStoreFactory messageStoreFactory = (MessageStoreFactory) o;
- final MessageStore messageStore = messageStoreFactory.createMessageStore();
- final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStoreFactory.getStoreClassName());
+ final MessageStore messageStore = (MessageStore) o;
+ final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, clazz.getSimpleName());
OperationalLoggingListener.listen(messageStore, storeLogSubject);
messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE);
@@ -366,7 +365,10 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
private void activateNonHAMessageStore() throws Exception
{
- _messageStore.activate();
+ if (!(_messageStore instanceof HAMessageStore))
+ {
+ _messageStore.activate();
+ }
}
private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException
@@ -801,42 +803,42 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
}
private final class BeforeActivationListener implements EventListener
- {
- @Override
- public void event(Event event)
- {
- try
- {
- _exchangeRegistry.initialise();
- initialiseModel(_vhostConfig);
- }
- catch (Exception e)
- {
- throw new RuntimeException("Failed to initialise virtual host after state change", e);
- }
- }
- }
-
- private final class AfterActivationListener implements EventListener
- {
- @Override
- public void event(Event event)
- {
- initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
- try
- {
- _brokerMBean.register();
- }
- catch (JMException e)
- {
- throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e);
- }
+ {
+ @Override
+ public void event(Event event)
+ {
+ try
+ {
+ _exchangeRegistry.initialise();
+ initialiseModel(_vhostConfig);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed to initialise virtual host after state change", e);
+ }
+ }
+ }
+
+ private final class AfterActivationListener implements EventListener
+ {
+ @Override
+ public void event(Event event)
+ {
+ initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
+ try
+ {
+ _brokerMBean.register();
+ }
+ catch (JMException e)
+ {
+ throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e);
+ }
- _state = State.ACTIVE;
- }
- }
+ _state = State.ACTIVE;
+ }
+ }
- public class BeforePassivationListener implements EventListener
+ private final class BeforePassivationListener implements EventListener
{
public void event(Event event)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
index d26e286c90..d34d1bbef3 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
@@ -36,7 +36,7 @@ import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -165,7 +165,7 @@ public class AMQBrokerManagerMBeanTest extends QpidTestCase
XMLConfiguration configXml = new XMLConfiguration();
configXml.addProperty("virtualhosts.virtualhost(-1).name", "test");
- configXml.addProperty("virtualhosts.virtualhost(-1).test.store.factoryclass", TestableMemoryMessageStoreFactory.class.getName());
+ configXml.addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName());
ServerConfiguration configuration = new ServerConfiguration(configXml);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
index c4c93acfb6..50e7f0588b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
@@ -27,7 +27,6 @@ import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -162,7 +161,7 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase
getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "r2d2");
getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues.queue.r2d2.deadLetterQueues", "true");
getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "c3p0");
- getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.store.factoryclass", TestableMemoryMessageStoreFactory.class.getName());
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.store.class", TestableMemoryMessageStore.class.getName());
// Start the broker now.
super.createBroker();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index e123a968a4..337ff194c3 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import java.util.UUID;
-
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.AMQException;
@@ -37,7 +35,6 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory;
import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -56,7 +53,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
XMLConfiguration configXml = new XMLConfiguration();
configXml.addProperty("virtualhosts.virtualhost(-1).name", getName());
- configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.factoryclass", TestableMemoryMessageStoreFactory.class.getName());
+ configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.class", TestableMemoryMessageStore.class.getName());
ServerConfiguration configuration = new ServerConfiguration(configXml);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 52ad4a7c5b..a8fad96063 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -43,7 +43,6 @@ import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -108,7 +107,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
PropertiesConfiguration env = new PropertiesConfiguration();
final VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration(getClass().getName(), env);
- vhostConfig.setMessageStoreFactoryClass(TestableMemoryMessageStoreFactory.class.getName());
+ vhostConfig.setMessageStoreClass(TestableMemoryMessageStore.class.getName());
_virtualHost = new VirtualHostImpl(ApplicationRegistry.getInstance(), vhostConfig);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
index a1cbb2cbc8..48e631a0f4 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
@@ -27,7 +27,6 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.SystemOutMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
-import org.apache.qpid.server.logging.subjects.TestBlankSubject;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
@@ -41,7 +40,7 @@ import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecover
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.server.store.Transaction.Record;
-import org.apache.qpid.server.store.derby.DerbyMessageStoreFactory;
+import org.apache.qpid.server.store.derby.DerbyMessageStore;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -262,14 +261,14 @@ public class DurableConfigurationStoreTest extends QpidTestCase
protected MessageStore createStore() throws Exception
{
- String storeFactoryClass = System.getProperty(MS_FACTORY_CLASS_NAME_KEY);
- if (storeFactoryClass == null)
+ String storeClass = System.getProperty(MESSAGE_STORE_CLASS_NAME_KEY);
+ if (storeClass == null)
{
- storeFactoryClass = DerbyMessageStoreFactory.class.getName();
+ storeClass = DerbyMessageStore.class.getName();
}
CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
- MessageStoreFactory factory = (MessageStoreFactory) Class.forName(storeFactoryClass).newInstance();
- return factory.createMessageStore();
+ MessageStore messageStore = (MessageStore) Class.forName(storeClass).newInstance();
+ return messageStore;
}
public void testRecordXid() throws Exception
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 3fb0776083..64048d294b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -59,7 +59,6 @@ import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
/**
* This tests the MessageStores by using the available interfaces.
@@ -103,7 +102,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
String storePath = System.getProperty("QPID_WORK") + "/" + getName();
_config = new PropertiesConfiguration();
- _config.addProperty("store.factoryclass", getTestProfileMessageStoreFactoryClassName());
+ _config.addProperty("store.class", getTestProfileMessageStoreClassName());
_config.addProperty("store.environment-path", storePath);
cleanup(new File(storePath));
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java
index 42746f9119..c6ef35d255 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java
@@ -73,11 +73,11 @@ public class OperationalLoggingListenerTest extends TestCase
}
- messageStore.attainState(State.CONFIGURING);
+ messageStore.attainState(State.INITIALISING);
assertEquals("Unexpected number of operational log messages on configuring", 1, messages.size());
assertEquals(messages.remove(0).toString(), ConfigStoreMessages.CREATED().toString());
- messageStore.attainState(State.CONFIGURED);
+ messageStore.attainState(State.INITIALISED);
assertEquals("Unexpected number of operational log messages on CONFIGURED", setStoreLocation ? 3 : 2, messages.size());
assertEquals(messages.remove(0).toString(), MessageStoreMessages.CREATED().toString());
assertEquals(messages.remove(0).toString(), TransactionLogMessages.CREATED().toString());
@@ -86,7 +86,7 @@ public class OperationalLoggingListenerTest extends TestCase
assertEquals(messages.remove(0).toString(), MessageStoreMessages.STORE_LOCATION(STORE_LOCATION).toString());
}
- messageStore.attainState(State.RECOVERING);
+ messageStore.attainState(State.ACTIVATING);
assertEquals("Unexpected number of operational log messages on RECOVERING", 1, messages.size());
assertEquals(messages.remove(0).toString(), MessageStoreMessages.RECOVERY_START().toString());
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
index 97c88ca1d3..18efb976eb 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
@@ -45,8 +45,8 @@ public class StateManagerTest extends TestCase implements EventListener
{
assertEquals(State.INITIAL, _manager.getState());
- _manager.stateTransition(State.INITIAL, State.CONFIGURING);
- assertEquals(State.CONFIGURING, _manager.getState());
+ _manager.attainState(State.INITIALISING);
+ assertEquals(State.INITIALISING, _manager.getState());
}
public void testStateTransitionDisallowed()
@@ -55,7 +55,7 @@ public class StateManagerTest extends TestCase implements EventListener
try
{
- _manager.stateTransition(State.ACTIVE, State.CLOSING);
+ _manager.attainState(State.CLOSING);
fail("Exception not thrown");
}
catch (IllegalStateException e)
@@ -98,22 +98,29 @@ public class StateManagerTest extends TestCase implements EventListener
public void testValidStateTransitions()
{
assertEquals(State.INITIAL, _manager.getState());
- performValidTransition(StateManager.CONFIGURE);
- performValidTransition(StateManager.CONFIGURE_COMPLETE);
- performValidTransition(StateManager.RECOVER);
+ performValidTransition(StateManager.INITIALISE);
+ performValidTransition(StateManager.INITALISE_COMPLETE);
performValidTransition(StateManager.ACTIVATE);
+ performValidTransition(StateManager.ACTIVATE_COMPLETE);
performValidTransition(StateManager.QUIESCE);
performValidTransition(StateManager.QUIESCE_COMPLETE);
performValidTransition(StateManager.RESTART);
- performValidTransition(StateManager.ACTIVATE);
+ performValidTransition(StateManager.ACTIVATE_COMPLETE);
performValidTransition(StateManager.CLOSE_ACTIVE);
performValidTransition(StateManager.CLOSE_COMPLETE);
+
+ _manager = new StateManager(this);
+ assertEquals(State.INITIAL, _manager.getState());
+ performValidTransition(StateManager.INITIALISE);
+ performValidTransition(StateManager.INITALISE_COMPLETE);
+ performValidTransition(StateManager.CLOSE_INITIALISED);
+ performValidTransition(StateManager.CLOSE_COMPLETE);
_manager = new StateManager(this);
- performValidTransition(StateManager.CONFIGURE);
- performValidTransition(StateManager.CONFIGURE_COMPLETE);
- performValidTransition(StateManager.RECOVER);
+ performValidTransition(StateManager.INITIALISE);
+ performValidTransition(StateManager.INITALISE_COMPLETE);
performValidTransition(StateManager.ACTIVATE);
+ performValidTransition(StateManager.ACTIVATE_COMPLETE);
performValidTransition(StateManager.QUIESCE);
performValidTransition(StateManager.QUIESCE_COMPLETE);
performValidTransition(StateManager.CLOSE_QUIESCED);
@@ -132,54 +139,50 @@ public class StateManagerTest extends TestCase implements EventListener
{
assertEquals(State.INITIAL, _manager.getState());
-
- performInvalidTransitions(StateManager.CONFIGURE, State.CONFIGURED);
- performInvalidTransitions(StateManager.CONFIGURE_COMPLETE, State.RECOVERING);
- performInvalidTransitions(StateManager.RECOVER, State.ACTIVE);
- performInvalidTransitions(StateManager.ACTIVATE, State.QUIESCING, State.CLOSING);
+ performInvalidTransitions(StateManager.INITIALISE, State.INITIALISED);
+ performInvalidTransitions(StateManager.INITALISE_COMPLETE, State.ACTIVATING, State.CLOSING);
+ performInvalidTransitions(StateManager.ACTIVATE, State.ACTIVE);
+ performInvalidTransitions(StateManager.ACTIVATE_COMPLETE, State.QUIESCING, State.CLOSING, State.INITIALISED);
performInvalidTransitions(StateManager.QUIESCE, State.QUIESCED);
- performInvalidTransitions(StateManager.QUIESCE_COMPLETE, State.RECOVERING, State.CLOSING);
+ performInvalidTransitions(StateManager.QUIESCE_COMPLETE, State.ACTIVATING, State.CLOSING);
performInvalidTransitions(StateManager.CLOSE_QUIESCED, State.CLOSED);
performInvalidTransitions(StateManager.CLOSE_COMPLETE);
-
-
-
}
- private void performInvalidTransitions(StateManager.Transition preTransition, State... validTransitions)
+ private void performInvalidTransitions(StateManager.Transition preTransition, State... validEndStates)
{
if(preTransition != null)
{
performValidTransition(preTransition);
}
- EnumSet<State> nextStates = EnumSet.allOf(State.class);
+ EnumSet<State> endStates = EnumSet.allOf(State.class);
- if(validTransitions != null)
+ if(validEndStates != null)
{
- for(State state: validTransitions)
+ for(State state: validEndStates)
{
- nextStates.remove(state);
+ endStates.remove(state);
}
}
- for(State nextState : nextStates)
+ for(State invalidEndState : endStates)
{
- performInvalidStateTransition(nextState);
+ performInvalidStateTransition(invalidEndState);
}
}
- private void performInvalidStateTransition(State state)
+ private void performInvalidStateTransition(State invalidEndState)
{
try
{
_event = null;
State startState = _manager.getState();
- _manager.attainState(state);
- fail("Invalid state transition performed: " + startState + " to " + state);
+ _manager.attainState(invalidEndState);
+ fail("Invalid state transition performed: " + startState + " to " + invalidEndState);
}
catch(IllegalStateException e)
{
@@ -188,6 +191,7 @@ public class StateManagerTest extends TestCase implements EventListener
assertNull("No event should have be fired", _event);
}
+ @Override
public void event(Event event)
{
_event = event;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
deleted file mode 100644
index 44070f22ad..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-public class TestableMemoryMessageStoreFactory implements MessageStoreFactory
-{
-
- @Override
- public MessageStore createMessageStore()
- {
- return new TestableMemoryMessageStore();
- }
-
- @Override
- public String getStoreClassName()
- {
- return TestableMemoryMessageStore.class.getSimpleName();
- }
-
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index f8200bf1cd..8a18aaff9e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -44,7 +44,6 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -67,10 +66,10 @@ public class InternalBrokerBaseCase extends QpidTestCase
super.setUp();
_configXml.addProperty("virtualhosts.virtualhost.name", "test");
- _configXml.addProperty("virtualhosts.virtualhost.test.store.factoryclass", TestableMemoryMessageStoreFactory.class.getName());
+ _configXml.addProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
_configXml.addProperty("virtualhosts.virtualhost(-1).name", getName());
- _configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.factoryclass", TestableMemoryMessageStoreFactory.class.getName());
+ _configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.class", TestableMemoryMessageStore.class.getName());
createBroker();
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java
index 87eb0f9d16..b8ba76e43d 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java
@@ -27,7 +27,7 @@ import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MemoryMessageStoreFactory;
+import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -192,7 +192,7 @@ public class VirtualHostImplTest extends QpidTestCase
writer.write(" <name>" + vhostName + "</name>");
writer.write(" <" + vhostName + ">");
writer.write(" <store>");
- writer.write(" <factoryclass>" + MemoryMessageStoreFactory.class.getName() + "</factoryclass>");
+ writer.write(" <class>" + MemoryMessageStore.class.getName() + "</class>");
writer.write(" </store>");
if(exchangeName != null && !dontDeclare)
{