From ee4496653ff9139ab13f57dfaaa081df40714a45 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Thu, 31 Jul 2014 12:02:10 +0000 Subject: QPID-5926: [Java Broker] When transitioning from STOPPED to ACTIVE the virtualhost re-recovers children beneath it git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1614866 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/model/AbstractConfiguredObject.java | 2 +- .../store/BrokerStoreUpgraderAndRecoverer.java | 2 +- .../apache/qpid/server/store/GenericRecoverer.java | 79 ++++++++++++++-------- .../VirtualHostStoreUpgraderAndRecoverer.java | 2 +- .../server/virtualhost/AbstractVirtualHost.java | 59 ++++++++++++++-- .../apache/qpid/server/model/VirtualHostTest.java | 66 +++++++++++++++++- .../qpid/server/store/BrokerRecovererTest.java | 2 +- 7 files changed, 173 insertions(+), 39 deletions(-) (limited to 'qpid/java/broker-core') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index e886f913c1..2755c8dba4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -574,7 +574,7 @@ public abstract class AbstractConfiguredObject> im } } - private void applyToChildren(Action> action) + protected void applyToChildren(Action> action) { for (Class childClass : getModel().getChildTypes(getCategoryClass())) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java index 15a2a3cbbd..be7db10773 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java @@ -477,7 +477,7 @@ public class BrokerStoreUpgraderAndRecoverer public Broker perform(final DurableConfigurationStore store) { List upgradedRecords = upgrade(store); - new GenericRecoverer(_systemConfig, Broker.class.getSimpleName()).recover(upgradedRecords); + new GenericRecoverer(_systemConfig).recover(upgradedRecords); final StoreConfigurationChangeListener configChangeListener = new StoreConfigurationChangeListener(store); applyRecursively(_systemConfig.getBroker(), new Action>() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java index f281c58c79..58fa852849 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.store; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -40,18 +41,16 @@ public class GenericRecoverer { private static final Logger LOGGER = Logger.getLogger(GenericRecoverer.class); - private final ConfiguredObject _parentOfRoot; - private final String _rootCategory; + private final ConfiguredObject _root; - public GenericRecoverer(ConfiguredObject parentOfRoot, String rootCategory) + public GenericRecoverer(ConfiguredObject root) { - _parentOfRoot = parentOfRoot; - _rootCategory = rootCategory; + _root = root; } public void recover(final List records) { - _parentOfRoot.getTaskExecutor().run(new VoidTask() + _root.getTaskExecutor().run(new VoidTask() { @Override public void execute() @@ -62,44 +61,70 @@ public class GenericRecoverer @Override public String toString() { - return _rootCategory + " recovery"; + return "RecoveringChildrenOf_" + _root.getCategoryClass().getSimpleName(); } }); - } private void performRecover(List records) { - ConfiguredObjectRecord rootRecord = null; + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Recovering the children of " + _root); + } + + records = resolveDiscontinuity(records); + resolveObjects(_root, records); + } + + private List resolveDiscontinuity(final List records) + { + Collection> childTypesOfRoot = _root.getModel().getChildTypes(_root.getCategoryClass()); + List newRecords = new ArrayList<>(records.size()); + for (ConfiguredObjectRecord record : records) { - if (_rootCategory.equals(record.getType())) + if (record.getId().equals(_root.getId())) { - rootRecord = record; - break; + // If the parent is already in the records, we skip it, this supports partial recovery + // (required when restarting a virtualhost). In the long term, when the objects take responsibility + // for the recovery of immediate descendants only, this will disappear. + } + else if ((record.getParents() == null || record.getParents().size() == 0)) + { + if (containsCategory(childTypesOfRoot, record.getType())) + { + String parentOfRootCategory = _root.getCategoryClass().getSimpleName(); + Map rootParents = Collections.singletonMap(parentOfRootCategory, _root.getId()); + newRecords.add(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes(), rootParents)); + } + else + { + throw new IllegalArgumentException("Recovered configured object record " + record + + " has no recorded parents and is not a valid child type" + + " [" + Arrays.toString(childTypesOfRoot.toArray()) + "]" + + " for the root " + _root); + } + } + else + { + newRecords.add(record); } } - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Root record " + rootRecord); - } + return newRecords; + } - if (rootRecord != null) + private boolean containsCategory(Collection> childCategories, String categorySimpleName) + { + for (Class child : childCategories) { - - if (rootRecord.getParents() == null || rootRecord.getParents().isEmpty()) + if (child.getSimpleName().equals(categorySimpleName)) { - records = new ArrayList(records); - - String parentOfRootCategory = _parentOfRoot.getCategoryClass().getSimpleName(); - Map rootParents = Collections.singletonMap(parentOfRootCategory, _parentOfRoot.getId()); - records.remove(rootRecord); - records.add(new ConfiguredObjectRecordImpl(rootRecord.getId(), _rootCategory, rootRecord.getAttributes(), rootParents)); + return true; } - - resolveObjects(_parentOfRoot, records); } + return false; } private void resolveObjects(ConfiguredObject parentObject, List records) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java index cbd3f0962e..5565ea6175 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java @@ -406,6 +406,6 @@ public class VirtualHostStoreUpgraderAndRecoverer GenericStoreUpgrader upgraderHandler = new GenericStoreUpgrader(virtualHostCategory, VirtualHost.MODEL_VERSION, durableConfigurationStore, _upgraders); upgraderHandler.upgrade(); - new GenericRecoverer(_virtualHostNode, virtualHostCategory).recover(upgraderHandler.getRecords()); + new GenericRecoverer(_virtualHostNode).recover(upgraderHandler.getRecords()); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 80d1dcf15f..707be9ed7b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -75,6 +75,7 @@ import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.GenericRecoverer; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreProvider; import org.apache.qpid.server.store.StoreException; @@ -82,6 +83,7 @@ import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.MapValueConverter; @@ -341,7 +343,6 @@ public abstract class AbstractVirtualHost> exte public Collection getConnections() { return getChildren(Connection.class); - } @Override @@ -1317,7 +1318,7 @@ public abstract class AbstractVirtualHost> exte getDurableConfigurationStore().create(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes())); } - @StateTransition( currentState = { State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) + @StateTransition( currentState = { State.UNINITIALIZED }, desiredState = State.ACTIVE ) private void onActivate() { _houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount()); @@ -1339,9 +1340,6 @@ public abstract class AbstractVirtualHost> exte } MessageStoreRecoverer messageStoreRecoverer; - - - if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY)) { messageStoreRecoverer = new AsynchronousMessageStoreRecoverer(); @@ -1364,7 +1362,58 @@ public abstract class AbstractVirtualHost> exte _state.set(finalState); reportIfError(_state.get()); } + } + + @StateTransition( currentState = { State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) + private void onRestart() + { + resetStatistics(); + + final List records = new ArrayList<>(); + + // Transitioning to STOPPED will have closed all our children. Now we are transition + // back to ACTIVE, we need to recover and re-open them. + + getDurableConfigurationStore().visitConfiguredObjectRecords(new ConfiguredObjectRecordHandler() + { + @Override + public void begin() + { + } + + @Override + public boolean handle(final ConfiguredObjectRecord record) + { + records.add(record); + return true; + } + + @Override + public void end() + { + } + }); + + new GenericRecoverer(this).recover(records); + + Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction() + { + @Override + public Object run() + { + applyToChildren(new Action>() + { + @Override + public void performAction(final ConfiguredObject object) + { + object.open(); + } + }); + return null; + } + }); + onActivate(); } private class StoreUpdatingChangeListener implements ConfigurationChangeListener diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java index 70fd95a608..db5cf1a7ba 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java @@ -20,34 +20,40 @@ */ package org.apache.qpid.server.model; +import static java.util.Arrays.asList; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import java.security.AccessControlException; -import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.UUID; import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutor; -import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.connection.IConnectionRegistry.RegistryChangeListener; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -157,6 +163,60 @@ public class VirtualHostTest extends QpidTestCase verify(_configStore, times(2)).update(eq(false), matchesRecord(virtualHost.getId(), virtualHost.getType())); } + public void testRestartingVirtualHostRecoversChildren() + { + String virtualHostName = getName(); + + VirtualHost virtualHost = createVirtualHost(virtualHostName); + assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState()); + final ConfiguredObjectRecord virtualHostCor = virtualHost.asObjectRecord(); + + // Give virtualhost a queue and an exchange + Queue queue = virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, "myQueue")); + final ConfiguredObjectRecord queueCor = queue.asObjectRecord(); + + Map exchangeArgs = new HashMap<>(); + exchangeArgs.put(Exchange.NAME, "myExchange"); + exchangeArgs.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + + Exchange exchange = virtualHost.createChild(Exchange.class, exchangeArgs); + final ConfiguredObjectRecord exchangeCor = exchange.asObjectRecord(); + + assertEquals("Unexpected number of queues before stop", 1, virtualHost.getChildren(Queue.class).size()); + assertEquals("Unexpected number of exchanges before stop", 5, virtualHost.getChildren(Exchange.class).size()); + + virtualHost.stop(); + assertEquals("Unexpected state", State.STOPPED, virtualHost.getState()); + assertEquals("Unexpected number of queues after stop", 0, virtualHost.getChildren(Queue.class).size()); + assertEquals("Unexpected number of exchanges after stop", 0, virtualHost.getChildren(Exchange.class).size()); + + // Setup an answer that will return the configured object records + doAnswer(new Answer() + { + final Iterator corIterator = asList(queueCor, exchangeCor, virtualHostCor).iterator(); + + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + ConfiguredObjectRecordHandler handler = (ConfiguredObjectRecordHandler) invocation.getArguments()[0]; + boolean handlerContinue = true; + while(corIterator.hasNext() && handlerContinue) + { + handlerContinue = handler.handle(corIterator.next()); + } + + return null; + } + }).when(_configStore).visitConfiguredObjectRecords(any(ConfiguredObjectRecordHandler.class)); + + virtualHost.start(); + assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState()); + + assertEquals("Unexpected number of queues after restart", 1, virtualHost.getChildren(Queue.class).size()); + assertEquals("Unexpected number of exchanges after restart", 5, virtualHost.getChildren(Exchange.class).size()); + } + + public void testStopVirtualHost_ClosesConnections() { String virtualHostName = getName(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java index 638f21d82c..52f70e7fd6 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java @@ -325,7 +325,7 @@ public class BrokerRecovererTest extends TestCase private void resolveObjects(ConfiguredObjectRecord... records) { - GenericRecoverer recoverer = new GenericRecoverer(_systemConfig, Broker.class.getSimpleName()); + GenericRecoverer recoverer = new GenericRecoverer(_systemConfig); recoverer.recover(Arrays.asList(records)); } -- cgit v1.2.1