summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-07-31 12:02:10 +0000
committerKeith Wall <kwall@apache.org>2014-07-31 12:02:10 +0000
commitee4496653ff9139ab13f57dfaaa081df40714a45 (patch)
treee1dcd21bc97ae6526c6896d63a13489d553fc31a /qpid/java/broker-core
parent660fb9713deb72f0ce35a5ca960f6e37dc14ab14 (diff)
downloadqpid-python-ee4496653ff9139ab13f57dfaaa081df40714a45.tar.gz
QPID-5926: [Java Broker] When transitioning from STOPPED to ACTIVE the virtualhost re-recovers children beneath it
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1614866 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java79
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java59
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java66
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java2
7 files changed, 173 insertions, 39 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index e886f913c1..2755c8dba4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -574,7 +574,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
- private void applyToChildren(Action<ConfiguredObject<?>> action)
+ protected void applyToChildren(Action<ConfiguredObject<?>> action)
{
for (Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
index 15a2a3cbbd..be7db10773 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
@@ -477,7 +477,7 @@ public class BrokerStoreUpgraderAndRecoverer
public Broker<?> perform(final DurableConfigurationStore store)
{
List<ConfiguredObjectRecord> upgradedRecords = upgrade(store);
- new GenericRecoverer(_systemConfig, Broker.class.getSimpleName()).recover(upgradedRecords);
+ new GenericRecoverer(_systemConfig).recover(upgradedRecords);
final StoreConfigurationChangeListener configChangeListener = new StoreConfigurationChangeListener(store);
applyRecursively(_systemConfig.getBroker(), new Action<ConfiguredObject<?>>()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java
index f281c58c79..58fa852849 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -40,18 +41,16 @@ public class GenericRecoverer
{
private static final Logger LOGGER = Logger.getLogger(GenericRecoverer.class);
- private final ConfiguredObject<?> _parentOfRoot;
- private final String _rootCategory;
+ private final ConfiguredObject<?> _root;
- public GenericRecoverer(ConfiguredObject<?> parentOfRoot, String rootCategory)
+ public GenericRecoverer(ConfiguredObject<?> root)
{
- _parentOfRoot = parentOfRoot;
- _rootCategory = rootCategory;
+ _root = root;
}
public void recover(final List<ConfiguredObjectRecord> records)
{
- _parentOfRoot.getTaskExecutor().run(new VoidTask()
+ _root.getTaskExecutor().run(new VoidTask()
{
@Override
public void execute()
@@ -62,44 +61,70 @@ public class GenericRecoverer
@Override
public String toString()
{
- return _rootCategory + " recovery";
+ return "RecoveringChildrenOf_" + _root.getCategoryClass().getSimpleName();
}
});
-
}
private void performRecover(List<ConfiguredObjectRecord> records)
{
- ConfiguredObjectRecord rootRecord = null;
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Recovering the children of " + _root);
+ }
+
+ records = resolveDiscontinuity(records);
+ resolveObjects(_root, records);
+ }
+
+ private List<ConfiguredObjectRecord> resolveDiscontinuity(final List<ConfiguredObjectRecord> records)
+ {
+ Collection<Class<? extends ConfiguredObject>> childTypesOfRoot = _root.getModel().getChildTypes(_root.getCategoryClass());
+ List<ConfiguredObjectRecord> newRecords = new ArrayList<>(records.size());
+
for (ConfiguredObjectRecord record : records)
{
- if (_rootCategory.equals(record.getType()))
+ if (record.getId().equals(_root.getId()))
{
- rootRecord = record;
- break;
+ // If the parent is already in the records, we skip it, this supports partial recovery
+ // (required when restarting a virtualhost). In the long term, when the objects take responsibility
+ // for the recovery of immediate descendants only, this will disappear.
+ }
+ else if ((record.getParents() == null || record.getParents().size() == 0))
+ {
+ if (containsCategory(childTypesOfRoot, record.getType()))
+ {
+ String parentOfRootCategory = _root.getCategoryClass().getSimpleName();
+ Map<String, UUID> rootParents = Collections.singletonMap(parentOfRootCategory, _root.getId());
+ newRecords.add(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes(), rootParents));
+ }
+ else
+ {
+ throw new IllegalArgumentException("Recovered configured object record " + record
+ + " has no recorded parents and is not a valid child type"
+ + " [" + Arrays.toString(childTypesOfRoot.toArray()) + "]"
+ + " for the root " + _root);
+ }
+ }
+ else
+ {
+ newRecords.add(record);
}
}
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Root record " + rootRecord);
- }
+ return newRecords;
+ }
- if (rootRecord != null)
+ private boolean containsCategory(Collection<Class<? extends ConfiguredObject>> childCategories, String categorySimpleName)
+ {
+ for (Class<? extends ConfiguredObject> child : childCategories)
{
-
- if (rootRecord.getParents() == null || rootRecord.getParents().isEmpty())
+ if (child.getSimpleName().equals(categorySimpleName))
{
- records = new ArrayList<ConfiguredObjectRecord>(records);
-
- String parentOfRootCategory = _parentOfRoot.getCategoryClass().getSimpleName();
- Map<String, UUID> rootParents = Collections.singletonMap(parentOfRootCategory, _parentOfRoot.getId());
- records.remove(rootRecord);
- records.add(new ConfiguredObjectRecordImpl(rootRecord.getId(), _rootCategory, rootRecord.getAttributes(), rootParents));
+ return true;
}
-
- resolveObjects(_parentOfRoot, records);
}
+ return false;
}
private void resolveObjects(ConfiguredObject<?> parentObject, List<ConfiguredObjectRecord> records)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
index cbd3f0962e..5565ea6175 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
@@ -406,6 +406,6 @@ public class VirtualHostStoreUpgraderAndRecoverer
GenericStoreUpgrader upgraderHandler = new GenericStoreUpgrader(virtualHostCategory, VirtualHost.MODEL_VERSION, durableConfigurationStore, _upgraders);
upgraderHandler.upgrade();
- new GenericRecoverer(_virtualHostNode, virtualHostCategory).recover(upgraderHandler.getRecords());
+ new GenericRecoverer(_virtualHostNode).recover(upgraderHandler.getRecords());
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 80d1dcf15f..707be9ed7b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -75,6 +75,7 @@ import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.GenericRecoverer;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreProvider;
import org.apache.qpid.server.store.StoreException;
@@ -82,6 +83,7 @@ import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.MapValueConverter;
@@ -341,7 +343,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
public Collection<Connection> getConnections()
{
return getChildren(Connection.class);
-
}
@Override
@@ -1317,7 +1318,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
getDurableConfigurationStore().create(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes()));
}
- @StateTransition( currentState = { State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
+ @StateTransition( currentState = { State.UNINITIALIZED }, desiredState = State.ACTIVE )
private void onActivate()
{
_houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount());
@@ -1339,9 +1340,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
MessageStoreRecoverer messageStoreRecoverer;
-
-
-
if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY))
{
messageStoreRecoverer = new AsynchronousMessageStoreRecoverer();
@@ -1364,7 +1362,58 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
_state.set(finalState);
reportIfError(_state.get());
}
+ }
+
+ @StateTransition( currentState = { State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
+ private void onRestart()
+ {
+ resetStatistics();
+
+ final List<ConfiguredObjectRecord> records = new ArrayList<>();
+
+ // Transitioning to STOPPED will have closed all our children. Now we are transition
+ // back to ACTIVE, we need to recover and re-open them.
+
+ getDurableConfigurationStore().visitConfiguredObjectRecords(new ConfiguredObjectRecordHandler()
+ {
+ @Override
+ public void begin()
+ {
+ }
+
+ @Override
+ public boolean handle(final ConfiguredObjectRecord record)
+ {
+ records.add(record);
+ return true;
+ }
+
+ @Override
+ public void end()
+ {
+ }
+ });
+
+ new GenericRecoverer(this).recover(records);
+
+ Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ applyToChildren(new Action<ConfiguredObject<?>>()
+ {
+ @Override
+ public void performAction(final ConfiguredObject<?> object)
+ {
+ object.open();
+ }
+ });
+ return null;
+ }
+ });
+ onActivate();
}
private class StoreUpdatingChangeListener implements ConfigurationChangeListener
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
index 70fd95a608..db5cf1a7ba 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
@@ -20,34 +20,40 @@
*/
package org.apache.qpid.server.model;
+import static java.util.Arrays.asList;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.security.AccessControlException;
-import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.connection.IConnectionRegistry.RegistryChangeListener;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -157,6 +163,60 @@ public class VirtualHostTest extends QpidTestCase
verify(_configStore, times(2)).update(eq(false), matchesRecord(virtualHost.getId(), virtualHost.getType()));
}
+ public void testRestartingVirtualHostRecoversChildren()
+ {
+ String virtualHostName = getName();
+
+ VirtualHost<?,?,?> virtualHost = createVirtualHost(virtualHostName);
+ assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
+ final ConfiguredObjectRecord virtualHostCor = virtualHost.asObjectRecord();
+
+ // Give virtualhost a queue and an exchange
+ Queue queue = virtualHost.createChild(Queue.class, Collections.<String, Object>singletonMap(Queue.NAME, "myQueue"));
+ final ConfiguredObjectRecord queueCor = queue.asObjectRecord();
+
+ Map<String, Object> exchangeArgs = new HashMap<>();
+ exchangeArgs.put(Exchange.NAME, "myExchange");
+ exchangeArgs.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+ Exchange exchange = virtualHost.createChild(Exchange.class, exchangeArgs);
+ final ConfiguredObjectRecord exchangeCor = exchange.asObjectRecord();
+
+ assertEquals("Unexpected number of queues before stop", 1, virtualHost.getChildren(Queue.class).size());
+ assertEquals("Unexpected number of exchanges before stop", 5, virtualHost.getChildren(Exchange.class).size());
+
+ virtualHost.stop();
+ assertEquals("Unexpected state", State.STOPPED, virtualHost.getState());
+ assertEquals("Unexpected number of queues after stop", 0, virtualHost.getChildren(Queue.class).size());
+ assertEquals("Unexpected number of exchanges after stop", 0, virtualHost.getChildren(Exchange.class).size());
+
+ // Setup an answer that will return the configured object records
+ doAnswer(new Answer()
+ {
+ final Iterator<ConfiguredObjectRecord> corIterator = asList(queueCor, exchangeCor, virtualHostCor).iterator();
+
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ ConfiguredObjectRecordHandler handler = (ConfiguredObjectRecordHandler) invocation.getArguments()[0];
+ boolean handlerContinue = true;
+ while(corIterator.hasNext() && handlerContinue)
+ {
+ handlerContinue = handler.handle(corIterator.next());
+ }
+
+ return null;
+ }
+ }).when(_configStore).visitConfiguredObjectRecords(any(ConfiguredObjectRecordHandler.class));
+
+ virtualHost.start();
+ assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
+
+ assertEquals("Unexpected number of queues after restart", 1, virtualHost.getChildren(Queue.class).size());
+ assertEquals("Unexpected number of exchanges after restart", 5, virtualHost.getChildren(Exchange.class).size());
+ }
+
+
public void testStopVirtualHost_ClosesConnections()
{
String virtualHostName = getName();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java
index 638f21d82c..52f70e7fd6 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java
@@ -325,7 +325,7 @@ public class BrokerRecovererTest extends TestCase
private void resolveObjects(ConfiguredObjectRecord... records)
{
- GenericRecoverer recoverer = new GenericRecoverer(_systemConfig, Broker.class.getSimpleName());
+ GenericRecoverer recoverer = new GenericRecoverer(_systemConfig);
recoverer.recover(Arrays.asList(records));
}