diff options
| author | Keith Wall <kwall@apache.org> | 2014-07-31 12:02:10 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-07-31 12:02:10 +0000 |
| commit | ee4496653ff9139ab13f57dfaaa081df40714a45 (patch) | |
| tree | e1dcd21bc97ae6526c6896d63a13489d553fc31a /qpid/java/broker-core | |
| parent | 660fb9713deb72f0ce35a5ca960f6e37dc14ab14 (diff) | |
| download | qpid-python-ee4496653ff9139ab13f57dfaaa081df40714a45.tar.gz | |
QPID-5926: [Java Broker] When transitioning from STOPPED to ACTIVE the virtualhost re-recovers children beneath it
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1614866 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
7 files changed, 173 insertions, 39 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index e886f913c1..2755c8dba4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -574,7 +574,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } - private void applyToChildren(Action<ConfiguredObject<?>> action) + protected void applyToChildren(Action<ConfiguredObject<?>> action) { for (Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass())) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java index 15a2a3cbbd..be7db10773 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java @@ -477,7 +477,7 @@ public class BrokerStoreUpgraderAndRecoverer public Broker<?> perform(final DurableConfigurationStore store) { List<ConfiguredObjectRecord> upgradedRecords = upgrade(store); - new GenericRecoverer(_systemConfig, Broker.class.getSimpleName()).recover(upgradedRecords); + new GenericRecoverer(_systemConfig).recover(upgradedRecords); final StoreConfigurationChangeListener configChangeListener = new StoreConfigurationChangeListener(store); applyRecursively(_systemConfig.getBroker(), new Action<ConfiguredObject<?>>() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java index f281c58c79..58fa852849 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.store; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -40,18 +41,16 @@ public class GenericRecoverer { private static final Logger LOGGER = Logger.getLogger(GenericRecoverer.class); - private final ConfiguredObject<?> _parentOfRoot; - private final String _rootCategory; + private final ConfiguredObject<?> _root; - public GenericRecoverer(ConfiguredObject<?> parentOfRoot, String rootCategory) + public GenericRecoverer(ConfiguredObject<?> root) { - _parentOfRoot = parentOfRoot; - _rootCategory = rootCategory; + _root = root; } public void recover(final List<ConfiguredObjectRecord> records) { - _parentOfRoot.getTaskExecutor().run(new VoidTask() + _root.getTaskExecutor().run(new VoidTask() { @Override public void execute() @@ -62,44 +61,70 @@ public class GenericRecoverer @Override public String toString() { - return _rootCategory + " recovery"; + return "RecoveringChildrenOf_" + _root.getCategoryClass().getSimpleName(); } }); - } private void performRecover(List<ConfiguredObjectRecord> records) { - ConfiguredObjectRecord rootRecord = null; + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Recovering the children of " + _root); + } + + records = resolveDiscontinuity(records); + resolveObjects(_root, records); + } + + private List<ConfiguredObjectRecord> resolveDiscontinuity(final List<ConfiguredObjectRecord> records) + { + Collection<Class<? extends ConfiguredObject>> childTypesOfRoot = _root.getModel().getChildTypes(_root.getCategoryClass()); + List<ConfiguredObjectRecord> newRecords = new ArrayList<>(records.size()); + for (ConfiguredObjectRecord record : records) { - if (_rootCategory.equals(record.getType())) + if (record.getId().equals(_root.getId())) { - rootRecord = record; - break; + // If the parent is already in the records, we skip it, this supports partial recovery + // (required when restarting a virtualhost). In the long term, when the objects take responsibility + // for the recovery of immediate descendants only, this will disappear. + } + else if ((record.getParents() == null || record.getParents().size() == 0)) + { + if (containsCategory(childTypesOfRoot, record.getType())) + { + String parentOfRootCategory = _root.getCategoryClass().getSimpleName(); + Map<String, UUID> rootParents = Collections.singletonMap(parentOfRootCategory, _root.getId()); + newRecords.add(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes(), rootParents)); + } + else + { + throw new IllegalArgumentException("Recovered configured object record " + record + + " has no recorded parents and is not a valid child type" + + " [" + Arrays.toString(childTypesOfRoot.toArray()) + "]" + + " for the root " + _root); + } + } + else + { + newRecords.add(record); } } - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Root record " + rootRecord); - } + return newRecords; + } - if (rootRecord != null) + private boolean containsCategory(Collection<Class<? extends ConfiguredObject>> childCategories, String categorySimpleName) + { + for (Class<? extends ConfiguredObject> child : childCategories) { - - if (rootRecord.getParents() == null || rootRecord.getParents().isEmpty()) + if (child.getSimpleName().equals(categorySimpleName)) { - records = new ArrayList<ConfiguredObjectRecord>(records); - - String parentOfRootCategory = _parentOfRoot.getCategoryClass().getSimpleName(); - Map<String, UUID> rootParents = Collections.singletonMap(parentOfRootCategory, _parentOfRoot.getId()); - records.remove(rootRecord); - records.add(new ConfiguredObjectRecordImpl(rootRecord.getId(), _rootCategory, rootRecord.getAttributes(), rootParents)); + return true; } - - resolveObjects(_parentOfRoot, records); } + return false; } private void resolveObjects(ConfiguredObject<?> parentObject, List<ConfiguredObjectRecord> records) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java index cbd3f0962e..5565ea6175 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java @@ -406,6 +406,6 @@ public class VirtualHostStoreUpgraderAndRecoverer GenericStoreUpgrader upgraderHandler = new GenericStoreUpgrader(virtualHostCategory, VirtualHost.MODEL_VERSION, durableConfigurationStore, _upgraders); upgraderHandler.upgrade(); - new GenericRecoverer(_virtualHostNode, virtualHostCategory).recover(upgraderHandler.getRecords()); + new GenericRecoverer(_virtualHostNode).recover(upgraderHandler.getRecords()); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 80d1dcf15f..707be9ed7b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -75,6 +75,7 @@ import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.GenericRecoverer; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreProvider; import org.apache.qpid.server.store.StoreException; @@ -82,6 +83,7 @@ import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.MapValueConverter; @@ -341,7 +343,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte public Collection<Connection> getConnections() { return getChildren(Connection.class); - } @Override @@ -1317,7 +1318,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte getDurableConfigurationStore().create(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes())); } - @StateTransition( currentState = { State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) + @StateTransition( currentState = { State.UNINITIALIZED }, desiredState = State.ACTIVE ) private void onActivate() { _houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount()); @@ -1339,9 +1340,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } MessageStoreRecoverer messageStoreRecoverer; - - - if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY)) { messageStoreRecoverer = new AsynchronousMessageStoreRecoverer(); @@ -1364,7 +1362,58 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte _state.set(finalState); reportIfError(_state.get()); } + } + + @StateTransition( currentState = { State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) + private void onRestart() + { + resetStatistics(); + + final List<ConfiguredObjectRecord> records = new ArrayList<>(); + + // Transitioning to STOPPED will have closed all our children. Now we are transition + // back to ACTIVE, we need to recover and re-open them. + + getDurableConfigurationStore().visitConfiguredObjectRecords(new ConfiguredObjectRecordHandler() + { + @Override + public void begin() + { + } + + @Override + public boolean handle(final ConfiguredObjectRecord record) + { + records.add(record); + return true; + } + + @Override + public void end() + { + } + }); + + new GenericRecoverer(this).recover(records); + + Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + applyToChildren(new Action<ConfiguredObject<?>>() + { + @Override + public void performAction(final ConfiguredObject<?> object) + { + object.open(); + } + }); + return null; + } + }); + onActivate(); } private class StoreUpdatingChangeListener implements ConfigurationChangeListener diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java index 70fd95a608..db5cf1a7ba 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java @@ -20,34 +20,40 @@ */ package org.apache.qpid.server.model; +import static java.util.Arrays.asList; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import java.security.AccessControlException; -import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.UUID; import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutor; -import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.connection.IConnectionRegistry.RegistryChangeListener; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -157,6 +163,60 @@ public class VirtualHostTest extends QpidTestCase verify(_configStore, times(2)).update(eq(false), matchesRecord(virtualHost.getId(), virtualHost.getType())); } + public void testRestartingVirtualHostRecoversChildren() + { + String virtualHostName = getName(); + + VirtualHost<?,?,?> virtualHost = createVirtualHost(virtualHostName); + assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState()); + final ConfiguredObjectRecord virtualHostCor = virtualHost.asObjectRecord(); + + // Give virtualhost a queue and an exchange + Queue queue = virtualHost.createChild(Queue.class, Collections.<String, Object>singletonMap(Queue.NAME, "myQueue")); + final ConfiguredObjectRecord queueCor = queue.asObjectRecord(); + + Map<String, Object> exchangeArgs = new HashMap<>(); + exchangeArgs.put(Exchange.NAME, "myExchange"); + exchangeArgs.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + + Exchange exchange = virtualHost.createChild(Exchange.class, exchangeArgs); + final ConfiguredObjectRecord exchangeCor = exchange.asObjectRecord(); + + assertEquals("Unexpected number of queues before stop", 1, virtualHost.getChildren(Queue.class).size()); + assertEquals("Unexpected number of exchanges before stop", 5, virtualHost.getChildren(Exchange.class).size()); + + virtualHost.stop(); + assertEquals("Unexpected state", State.STOPPED, virtualHost.getState()); + assertEquals("Unexpected number of queues after stop", 0, virtualHost.getChildren(Queue.class).size()); + assertEquals("Unexpected number of exchanges after stop", 0, virtualHost.getChildren(Exchange.class).size()); + + // Setup an answer that will return the configured object records + doAnswer(new Answer() + { + final Iterator<ConfiguredObjectRecord> corIterator = asList(queueCor, exchangeCor, virtualHostCor).iterator(); + + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + ConfiguredObjectRecordHandler handler = (ConfiguredObjectRecordHandler) invocation.getArguments()[0]; + boolean handlerContinue = true; + while(corIterator.hasNext() && handlerContinue) + { + handlerContinue = handler.handle(corIterator.next()); + } + + return null; + } + }).when(_configStore).visitConfiguredObjectRecords(any(ConfiguredObjectRecordHandler.class)); + + virtualHost.start(); + assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState()); + + assertEquals("Unexpected number of queues after restart", 1, virtualHost.getChildren(Queue.class).size()); + assertEquals("Unexpected number of exchanges after restart", 5, virtualHost.getChildren(Exchange.class).size()); + } + + public void testStopVirtualHost_ClosesConnections() { String virtualHostName = getName(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java index 638f21d82c..52f70e7fd6 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java @@ -325,7 +325,7 @@ public class BrokerRecovererTest extends TestCase private void resolveObjects(ConfiguredObjectRecord... records) { - GenericRecoverer recoverer = new GenericRecoverer(_systemConfig, Broker.class.getSimpleName()); + GenericRecoverer recoverer = new GenericRecoverer(_systemConfig); recoverer.recover(Arrays.asList(records)); } |
