diff options
| author | Keith Wall <kwall@apache.org> | 2014-07-31 12:02:10 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-07-31 12:02:10 +0000 |
| commit | ee4496653ff9139ab13f57dfaaa081df40714a45 (patch) | |
| tree | e1dcd21bc97ae6526c6896d63a13489d553fc31a /qpid | |
| parent | 660fb9713deb72f0ce35a5ca960f6e37dc14ab14 (diff) | |
| download | qpid-python-ee4496653ff9139ab13f57dfaaa081df40714a45.tar.gz | |
QPID-5926: [Java Broker] When transitioning from STOPPED to ACTIVE the virtualhost re-recovers children beneath it
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1614866 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
11 files changed, 265 insertions, 54 deletions
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java index 80d940bc1b..e5d631b452 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java @@ -29,11 +29,14 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import javax.servlet.http.HttpServletResponse; + import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; +import org.apache.qpid.systest.rest.Asserts; import org.apache.qpid.systest.rest.QpidRestTestCase; import org.apache.qpid.test.utils.TestBrokerConfiguration; import org.apache.qpid.util.FileUtils; @@ -99,7 +102,7 @@ public class BDBHAVirtualHostRestTest extends QpidRestTestCase assertEquals("Unexpected synchronization policy before change", "SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)); Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "NO_SYNC"); - getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 200); + getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, HttpServletResponse.SC_OK); hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl); assertEquals("Unexpected synchronization policy after change", "NO_SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)); @@ -111,9 +114,36 @@ public class BDBHAVirtualHostRestTest extends QpidRestTestCase assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY)); Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC"); - getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 200); + getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, HttpServletResponse.SC_OK); hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl); assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY)); } + + public void testMutateState() throws Exception + { + waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE"); + assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE"); + + Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED"); + getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK); + + waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "STOPPED"); + assertActualAndDesireStates(_virtualhostUrl, "STOPPED", "STOPPED"); + + newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE"); + getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK); + + waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE"); + assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE"); + } + + private void assertActualAndDesireStates(final String restUrl, + final String expectedDesiredState, + final String expectedActualState) throws IOException + { + Map<String, Object> virtualhost = getRestTestHelper().getJsonAsSingletonList(restUrl); + Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, virtualhost); + } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index e886f913c1..2755c8dba4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -574,7 +574,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } - private void applyToChildren(Action<ConfiguredObject<?>> action) + protected void applyToChildren(Action<ConfiguredObject<?>> action) { for (Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass())) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java index 15a2a3cbbd..be7db10773 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java @@ -477,7 +477,7 @@ public class BrokerStoreUpgraderAndRecoverer public Broker<?> perform(final DurableConfigurationStore store) { List<ConfiguredObjectRecord> upgradedRecords = upgrade(store); - new GenericRecoverer(_systemConfig, Broker.class.getSimpleName()).recover(upgradedRecords); + new GenericRecoverer(_systemConfig).recover(upgradedRecords); final StoreConfigurationChangeListener configChangeListener = new StoreConfigurationChangeListener(store); applyRecursively(_systemConfig.getBroker(), new Action<ConfiguredObject<?>>() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java index f281c58c79..58fa852849 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.store; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -40,18 +41,16 @@ public class GenericRecoverer { private static final Logger LOGGER = Logger.getLogger(GenericRecoverer.class); - private final ConfiguredObject<?> _parentOfRoot; - private final String _rootCategory; + private final ConfiguredObject<?> _root; - public GenericRecoverer(ConfiguredObject<?> parentOfRoot, String rootCategory) + public GenericRecoverer(ConfiguredObject<?> root) { - _parentOfRoot = parentOfRoot; - _rootCategory = rootCategory; + _root = root; } public void recover(final List<ConfiguredObjectRecord> records) { - _parentOfRoot.getTaskExecutor().run(new VoidTask() + _root.getTaskExecutor().run(new VoidTask() { @Override public void execute() @@ -62,44 +61,70 @@ public class GenericRecoverer @Override public String toString() { - return _rootCategory + " recovery"; + return "RecoveringChildrenOf_" + _root.getCategoryClass().getSimpleName(); } }); - } private void performRecover(List<ConfiguredObjectRecord> records) { - ConfiguredObjectRecord rootRecord = null; + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Recovering the children of " + _root); + } + + records = resolveDiscontinuity(records); + resolveObjects(_root, records); + } + + private List<ConfiguredObjectRecord> resolveDiscontinuity(final List<ConfiguredObjectRecord> records) + { + Collection<Class<? extends ConfiguredObject>> childTypesOfRoot = _root.getModel().getChildTypes(_root.getCategoryClass()); + List<ConfiguredObjectRecord> newRecords = new ArrayList<>(records.size()); + for (ConfiguredObjectRecord record : records) { - if (_rootCategory.equals(record.getType())) + if (record.getId().equals(_root.getId())) { - rootRecord = record; - break; + // If the parent is already in the records, we skip it, this supports partial recovery + // (required when restarting a virtualhost). In the long term, when the objects take responsibility + // for the recovery of immediate descendants only, this will disappear. + } + else if ((record.getParents() == null || record.getParents().size() == 0)) + { + if (containsCategory(childTypesOfRoot, record.getType())) + { + String parentOfRootCategory = _root.getCategoryClass().getSimpleName(); + Map<String, UUID> rootParents = Collections.singletonMap(parentOfRootCategory, _root.getId()); + newRecords.add(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes(), rootParents)); + } + else + { + throw new IllegalArgumentException("Recovered configured object record " + record + + " has no recorded parents and is not a valid child type" + + " [" + Arrays.toString(childTypesOfRoot.toArray()) + "]" + + " for the root " + _root); + } + } + else + { + newRecords.add(record); } } - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Root record " + rootRecord); - } + return newRecords; + } - if (rootRecord != null) + private boolean containsCategory(Collection<Class<? extends ConfiguredObject>> childCategories, String categorySimpleName) + { + for (Class<? extends ConfiguredObject> child : childCategories) { - - if (rootRecord.getParents() == null || rootRecord.getParents().isEmpty()) + if (child.getSimpleName().equals(categorySimpleName)) { - records = new ArrayList<ConfiguredObjectRecord>(records); - - String parentOfRootCategory = _parentOfRoot.getCategoryClass().getSimpleName(); - Map<String, UUID> rootParents = Collections.singletonMap(parentOfRootCategory, _parentOfRoot.getId()); - records.remove(rootRecord); - records.add(new ConfiguredObjectRecordImpl(rootRecord.getId(), _rootCategory, rootRecord.getAttributes(), rootParents)); + return true; } - - resolveObjects(_parentOfRoot, records); } + return false; } private void resolveObjects(ConfiguredObject<?> parentObject, List<ConfiguredObjectRecord> records) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java index cbd3f0962e..5565ea6175 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java @@ -406,6 +406,6 @@ public class VirtualHostStoreUpgraderAndRecoverer GenericStoreUpgrader upgraderHandler = new GenericStoreUpgrader(virtualHostCategory, VirtualHost.MODEL_VERSION, durableConfigurationStore, _upgraders); upgraderHandler.upgrade(); - new GenericRecoverer(_virtualHostNode, virtualHostCategory).recover(upgraderHandler.getRecords()); + new GenericRecoverer(_virtualHostNode).recover(upgraderHandler.getRecords()); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 80d1dcf15f..707be9ed7b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -75,6 +75,7 @@ import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.GenericRecoverer; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreProvider; import org.apache.qpid.server.store.StoreException; @@ -82,6 +83,7 @@ import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.MapValueConverter; @@ -341,7 +343,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte public Collection<Connection> getConnections() { return getChildren(Connection.class); - } @Override @@ -1317,7 +1318,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte getDurableConfigurationStore().create(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes())); } - @StateTransition( currentState = { State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) + @StateTransition( currentState = { State.UNINITIALIZED }, desiredState = State.ACTIVE ) private void onActivate() { _houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount()); @@ -1339,9 +1340,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } MessageStoreRecoverer messageStoreRecoverer; - - - if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY)) { messageStoreRecoverer = new AsynchronousMessageStoreRecoverer(); @@ -1364,7 +1362,58 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte _state.set(finalState); reportIfError(_state.get()); } + } + + @StateTransition( currentState = { State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) + private void onRestart() + { + resetStatistics(); + + final List<ConfiguredObjectRecord> records = new ArrayList<>(); + + // Transitioning to STOPPED will have closed all our children. Now we are transition + // back to ACTIVE, we need to recover and re-open them. + + getDurableConfigurationStore().visitConfiguredObjectRecords(new ConfiguredObjectRecordHandler() + { + @Override + public void begin() + { + } + + @Override + public boolean handle(final ConfiguredObjectRecord record) + { + records.add(record); + return true; + } + + @Override + public void end() + { + } + }); + + new GenericRecoverer(this).recover(records); + + Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + applyToChildren(new Action<ConfiguredObject<?>>() + { + @Override + public void performAction(final ConfiguredObject<?> object) + { + object.open(); + } + }); + return null; + } + }); + onActivate(); } private class StoreUpdatingChangeListener implements ConfigurationChangeListener diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java index 70fd95a608..db5cf1a7ba 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java @@ -20,34 +20,40 @@ */ package org.apache.qpid.server.model; +import static java.util.Arrays.asList; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import java.security.AccessControlException; -import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.UUID; import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutor; -import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.connection.IConnectionRegistry.RegistryChangeListener; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -157,6 +163,60 @@ public class VirtualHostTest extends QpidTestCase verify(_configStore, times(2)).update(eq(false), matchesRecord(virtualHost.getId(), virtualHost.getType())); } + public void testRestartingVirtualHostRecoversChildren() + { + String virtualHostName = getName(); + + VirtualHost<?,?,?> virtualHost = createVirtualHost(virtualHostName); + assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState()); + final ConfiguredObjectRecord virtualHostCor = virtualHost.asObjectRecord(); + + // Give virtualhost a queue and an exchange + Queue queue = virtualHost.createChild(Queue.class, Collections.<String, Object>singletonMap(Queue.NAME, "myQueue")); + final ConfiguredObjectRecord queueCor = queue.asObjectRecord(); + + Map<String, Object> exchangeArgs = new HashMap<>(); + exchangeArgs.put(Exchange.NAME, "myExchange"); + exchangeArgs.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + + Exchange exchange = virtualHost.createChild(Exchange.class, exchangeArgs); + final ConfiguredObjectRecord exchangeCor = exchange.asObjectRecord(); + + assertEquals("Unexpected number of queues before stop", 1, virtualHost.getChildren(Queue.class).size()); + assertEquals("Unexpected number of exchanges before stop", 5, virtualHost.getChildren(Exchange.class).size()); + + virtualHost.stop(); + assertEquals("Unexpected state", State.STOPPED, virtualHost.getState()); + assertEquals("Unexpected number of queues after stop", 0, virtualHost.getChildren(Queue.class).size()); + assertEquals("Unexpected number of exchanges after stop", 0, virtualHost.getChildren(Exchange.class).size()); + + // Setup an answer that will return the configured object records + doAnswer(new Answer() + { + final Iterator<ConfiguredObjectRecord> corIterator = asList(queueCor, exchangeCor, virtualHostCor).iterator(); + + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + ConfiguredObjectRecordHandler handler = (ConfiguredObjectRecordHandler) invocation.getArguments()[0]; + boolean handlerContinue = true; + while(corIterator.hasNext() && handlerContinue) + { + handlerContinue = handler.handle(corIterator.next()); + } + + return null; + } + }).when(_configStore).visitConfiguredObjectRecords(any(ConfiguredObjectRecordHandler.class)); + + virtualHost.start(); + assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState()); + + assertEquals("Unexpected number of queues after restart", 1, virtualHost.getChildren(Queue.class).size()); + assertEquals("Unexpected number of exchanges after restart", 5, virtualHost.getChildren(Exchange.class).size()); + } + + public void testStopVirtualHost_ClosesConnections() { String virtualHostName = getName(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java index 638f21d82c..52f70e7fd6 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java @@ -325,7 +325,7 @@ public class BrokerRecovererTest extends TestCase private void resolveObjects(ConfiguredObjectRecord... records) { - GenericRecoverer recoverer = new GenericRecoverer(_systemConfig, Broker.class.getSimpleName()); + GenericRecoverer recoverer = new GenericRecoverer(_systemConfig); recoverer.recover(Arrays.asList(records)); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java index 2da4a21b2d..df87432344 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java @@ -112,14 +112,15 @@ public class QpidRestTestCase extends QpidBrokerTestCase public Map<String, Object> waitForAttributeChanged(String url, String attributeName, Object newValue) throws Exception { List<Map<String, Object>> nodeAttributes = getRestTestHelper().getJsonAsList(url); - long limit = System.currentTimeMillis() + 5000; + int timeout = 5000; + long limit = System.currentTimeMillis() + timeout; while(System.currentTimeMillis() < limit && (nodeAttributes.size() == 0 || !newValue.equals(nodeAttributes.get(0).get(attributeName)))) { Thread.sleep(100l); nodeAttributes = getRestTestHelper().getJsonAsList(url); } Map<String, Object> nodeData = nodeAttributes.get(0); - assertEquals("Unexpected attribute " + attributeName, newValue, nodeData.get(attributeName)); + assertEquals("Attribute " + attributeName + " did not reach expected value within permitted timeout " + timeout + "ms.", newValue, nodeData.get(attributeName)); return nodeData; } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java index da89772a22..243b93e798 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java @@ -27,12 +27,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.Session; import javax.servlet.http.HttpServletResponse; -import org.codehaus.jackson.JsonGenerationException; -import org.codehaus.jackson.map.JsonMappingException; - import org.apache.qpid.server.virtualhost.ProvidedStoreVirtualHostImpl; import org.apache.qpid.server.virtualhostnode.JsonVirtualHostNode; import org.apache.qpid.client.AMQConnection; @@ -163,22 +162,60 @@ public class VirtualHostRestTest extends QpidRestTestCase public void testMutateState() throws Exception { - String hostToUpdate = TEST3_VIRTUALHOST; - String restHostUrl = "virtualhost/" + hostToUpdate + "/" + hostToUpdate; + String restHostUrl = "virtualhost/" + TEST1_VIRTUALHOST + "/" + TEST1_VIRTUALHOST; + waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE"); assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE"); Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED"); getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK); + waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED"); assertActualAndDesireStates(restHostUrl, "STOPPED", "STOPPED"); newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE"); getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK); + waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE"); + assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE"); } + public void testMutateStateOfVirtualHostWithQueuesAndMessages() throws Exception + { + String testQueueName = getTestQueueName(); + String restHostUrl = "virtualhost/" + TEST1_VIRTUALHOST + "/" + TEST1_VIRTUALHOST; + String restQueueUrl = "queue/" + TEST1_VIRTUALHOST + "/" + TEST1_VIRTUALHOST + "/" + testQueueName; + + waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE"); + assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE"); + + Connection connection = getConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination dest = session.createQueue(testQueueName); + session.createConsumer(dest).close(); + session.createProducer(dest).send(session.createTextMessage("My test message")); + session.commit(); + connection.close(); + + assertQueueDepth(restQueueUrl, "Unexpected number of messages before stopped", 1); + + Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED"); + getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK); + + waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED"); + assertActualAndDesireStates(restHostUrl, "STOPPED", "STOPPED"); + + newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE"); + getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK); + + waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE"); + + assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE"); + + assertQueueDepth(restQueueUrl, "Unexpected number of messages after restart", 1); + } + public void testRecoverVirtualHostInDesiredStateStoppedWithDescription() throws Exception { String hostToUpdate = TEST3_VIRTUALHOST; @@ -502,15 +539,13 @@ public class VirtualHostRestTest extends QpidRestTestCase assertEquals("Unexpected response code", 201, statusCode); } - private void createQueue(String queueName, String queueType, Map<String, Object> attributes) throws IOException, - JsonGenerationException, JsonMappingException + private void createQueue(String queueName, String queueType, Map<String, Object> attributes) throws Exception { int responseCode = tryCreateQueue(queueName, queueType, attributes); assertEquals("Unexpected response code", 201, responseCode); } - private int tryCreateQueue(String queueName, String queueType, Map<String, Object> attributes) throws IOException, - JsonGenerationException, JsonMappingException + private int tryCreateQueue(String queueName, String queueType, Map<String, Object> attributes) throws Exception { Map<String, Object> queueData = new HashMap<String, Object>(); queueData.put(Queue.NAME, queueName); @@ -580,11 +615,21 @@ public class VirtualHostRestTest extends QpidRestTestCase } private void assertActualAndDesireStates(final String restUrl, - final String expectedDesiredState, - final String expectedActualState) throws IOException + final String expectedDesiredState, + final String expectedActualState) throws IOException { Map<String, Object> virtualhost = getRestTestHelper().getJsonAsSingletonList(restUrl); Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, virtualhost); } + private void assertQueueDepth(String restQueueUrl, String message, int expectedDepth) throws IOException + { + Map<String, Object> queueDetails = getRestTestHelper().getJsonAsSingletonList(restQueueUrl); + assertNotNull(queueDetails); + Map<String, Object> statistics = (Map<String, Object>) queueDetails.get(Asserts.STATISTICS_ATTRIBUTE); + assertNotNull(statistics); + + assertEquals(message, expectedDepth, statistics.get("queueDepthMessages")); + } + } diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes index ff4485d599..3bfeba92b3 100644 --- a/qpid/java/test-profiles/JavaTransientExcludes +++ b/qpid/java/test-profiles/JavaTransientExcludes @@ -57,6 +57,7 @@ org.apache.qpid.test.unit.client.MaxDeliveryCountTest#testWhenBrokerIsRestartedA org.apache.qpid.systest.rest.VirtualHostRestTest#testPutCreateVirtualHostUsingProfileNodeType org.apache.qpid.systest.rest.VirtualHostRestTest#testRecoverVirtualHostInDesiredStateStoppedWithDescription +org.apache.qpid.systest.rest.VirtualHostRestTest#testMutateStateOfVirtualHostWithQueuesAndMessages org.apache.qpid.systest.rest.VirtualHostNodeRestTest#testCreateAndDeleteVirtualHostNode |
