From fcc3f654b60b7dd2180afe73e8809545725b41af Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Tue, 25 Mar 2014 10:07:21 +0000 Subject: Introduction of separate lifecycle methods on stores for open/close/recover. Change Upgraders responsibility to create the amqp standard exchanges when upgrading from earlier store versions. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha2@1581288 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/store/QuotaMessageStore.java | 22 ++-- .../apache/qpid/server/store/SlowMessageStore.java | 125 +++++++++++--------- .../apache/qpid/server/store/SplitStoreTest.java | 130 +++++++++++++++++++++ 3 files changed, 208 insertions(+), 69 deletions(-) create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/store/SplitStoreTest.java (limited to 'qpid/java/systests/src') diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java index 7017ea6d45..06b7091a47 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageContentSource; -import org.apache.qpid.server.model.VirtualHost; public class QuotaMessageStore extends NullMessageStore @@ -49,9 +48,15 @@ public class } @Override - public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) + public void openConfigurationStore(String virtualHostName, Map storeSettings) { - Map messageStoreSettings = virtualHost.getMessageStoreSettings(); + + } + + @Override + public void openMessageStore(String virtualHostName, Map messageStoreSettings) + { + _stateManager.attainState(State.INITIALISING); Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE); _persistentSizeHighThreshold = overfullAttr == null ? Long.MAX_VALUE @@ -72,18 +77,11 @@ public class { _persistentSizeLowThreshold = _persistentSizeHighThreshold; } - _stateManager.attainState(State.INITIALISING); - } - - @Override - public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler) - { _stateManager.attainState(State.INITIALISED); } @Override - public void activate() + public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) { _stateManager.attainState(State.ACTIVATING); _stateManager.attainState(State.ACTIVE); @@ -153,7 +151,7 @@ public class } @Override - public void close() + public void closeMessageStore() { if (_closed.compareAndSet(false, true)) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index 84d1a9cb1c..44a5252355 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -29,7 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory; import org.apache.qpid.server.plugin.MessageStoreFactory; public class SlowMessageStore implements MessageStore, DurableConfigurationStore @@ -47,51 +47,26 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore private HashMap _preDelays = new HashMap(); private HashMap _postDelays = new HashMap(); private long _defaultDelay = 0L; - private MessageStore _realStore = null; - private DurableConfigurationStore _durableConfigurationStore = null; + private MessageStore _realMessageStore = null; + private DurableConfigurationStore _realDurableConfigurationStore = null; private Map _eventListeners = new ConcurrentHashMap(); - // ***** MessageStore Interface. - @Override - public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) + public void openConfigurationStore(String virtualHostName, Map storeSettings) { - _logger.info("Starting SlowMessageStore on Virtualhost:" + virtualHost.getName()); - - Map messageStoreSettings = virtualHost.getMessageStoreSettings(); - Object delaysAttr = messageStoreSettings.get(DELAYS); - - @SuppressWarnings({ "unchecked" }) - Map delays = (delaysAttr instanceof Map) ? (Map) delaysAttr : Collections.emptyMap(); - configureDelays(delays); - - final Object realStoreAttr = messageStoreSettings.get(REAL_STORE); - String messageStoreType = realStoreAttr == null ? MemoryMessageStore.TYPE : realStoreAttr.toString(); - - if (delays.containsKey(DEFAULT_DELAY)) - { - _defaultDelay = Long.parseLong(String.valueOf(delays.get(DEFAULT_DELAY))); - } - - _realStore = MessageStoreFactory.FACTORY_LOADER.get(messageStoreType).createMessageStore(); - - if (!_eventListeners.isEmpty()) - { - for (Iterator> it = _eventListeners.entrySet().iterator(); it.hasNext();) - { - Map.Entry entry = it.next(); - _realStore.addEventListener(entry.getKey(), entry.getValue()); - it.remove(); - } - } - - if (_realStore instanceof DurableConfigurationStore) + if (storeSettings != null && storeSettings.get(REAL_STORE) != null) { - _durableConfigurationStore = (DurableConfigurationStore)_realStore; - _durableConfigurationStore.configureConfigStore(virtualHost, recoveryHandler); + final String realStore = (String) storeSettings.get(REAL_STORE); + _realDurableConfigurationStore = new DurableConfigurationStoreCreator().createMessageStore(realStore); + _realDurableConfigurationStore.openConfigurationStore(virtualHostName, storeSettings); } + } + @Override + public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) + { + _realDurableConfigurationStore.recoverConfigurationStore(recoveryHandler); } private void configureDelays(Map delays) @@ -159,31 +134,67 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore } @Override - public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler) + public void openMessageStore(String virtualHostName, Map messageStoreSettings) { - _realStore.configureMessageStore(virtualHost, messageRecoveryHandler, tlogRecoveryHandler); + Object delaysAttr = messageStoreSettings.get(DELAYS); + + @SuppressWarnings({ "unchecked" }) + Map delays = (delaysAttr instanceof Map) ? (Map) delaysAttr : Collections.emptyMap(); + configureDelays(delays); + + if (delays.containsKey(DEFAULT_DELAY)) + { + _defaultDelay = Long.parseLong(String.valueOf(delays.get(DEFAULT_DELAY))); + } + + final Object realStoreAttr = messageStoreSettings.get(REAL_STORE) == null ? MemoryMessageStore.TYPE : messageStoreSettings.get(REAL_STORE); + final String realStore = (String) realStoreAttr; + _realMessageStore = MessageStoreFactory.FACTORY_LOADER.get(realStore).createMessageStore(); + + if (!_eventListeners.isEmpty()) + { + for (Iterator> it = _eventListeners.entrySet().iterator(); it.hasNext();) + { + Map.Entry entry = it.next(); + _realMessageStore.addEventListener(entry.getKey(), entry.getValue()); + it.remove(); + } + } + _realMessageStore.openMessageStore(virtualHostName, messageStoreSettings); + + if (_realDurableConfigurationStore == null) + { + _realDurableConfigurationStore = (DurableConfigurationStore) _realMessageStore; + } + } @Override - public void close() + public void closeMessageStore() { doPreDelay("close"); - _realStore.close(); + _realMessageStore.closeMessageStore(); doPostDelay("close"); } + @Override + public void closeConfigurationStore() + { + _realDurableConfigurationStore.closeConfigurationStore(); + } + + @Override public StoredMessage addMessage(M metaData) { - return _realStore.addMessage(metaData); + return _realMessageStore.addMessage(metaData); } @Override public void create(UUID id, String type, Map attributes) throws StoreException { doPreDelay("create"); - _durableConfigurationStore.create(id, type, attributes); + _realDurableConfigurationStore.create(id, type, attributes); doPostDelay("create"); } @@ -191,7 +202,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore public void remove(UUID id, String type) throws StoreException { doPreDelay("remove"); - _durableConfigurationStore.remove(id, type); + _realDurableConfigurationStore.remove(id, type); doPostDelay("remove"); } @@ -199,7 +210,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore public UUID[] removeConfiguredObjects(final UUID... objects) throws StoreException { doPreDelay("remove"); - UUID[] removed = _durableConfigurationStore.removeConfiguredObjects(objects); + UUID[] removed = _realDurableConfigurationStore.removeConfiguredObjects(objects); doPostDelay("remove"); return removed; } @@ -208,7 +219,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore public void update(UUID id, String type, Map attributes) throws StoreException { doPreDelay("update"); - _durableConfigurationStore.update(id, type, attributes); + _realDurableConfigurationStore.update(id, type, attributes); doPostDelay("update"); } @@ -216,7 +227,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException { doPreDelay("update"); - _durableConfigurationStore.update(createIfNecessary, records); + _realDurableConfigurationStore.update(createIfNecessary, records); doPostDelay("update"); } @@ -224,7 +235,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore public Transaction newTransaction() { doPreDelay("beginTran"); - Transaction txn = new SlowTransaction(_realStore.newTransaction()); + Transaction txn = new SlowTransaction(_realMessageStore.newTransaction()); doPostDelay("beginTran"); return txn; } @@ -232,7 +243,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore @Override public boolean isPersistent() { - return _realStore.isPersistent(); + return _realMessageStore.isPersistent(); } private class SlowTransaction implements Transaction @@ -299,28 +310,28 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore } @Override - public void activate() + public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) { - _realStore.activate(); + _realMessageStore.recoverMessageStore(messageRecoveryHandler, transactionLogRecoveryHandler); } @Override public void addEventListener(EventListener eventListener, Event... events) { - if (_realStore == null) + if (_realMessageStore == null) { _eventListeners .put(eventListener, events); } else { - _realStore.addEventListener(eventListener, events); + _realMessageStore.addEventListener(eventListener, events); } } @Override public String getStoreLocation() { - return _realStore.getStoreLocation(); + return _realMessageStore.getStoreLocation(); } @Override @@ -332,7 +343,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore @Override public void onDelete() { - _realStore.onDelete(); + _realMessageStore.onDelete(); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SplitStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SplitStoreTest.java new file mode 100644 index 0000000000..9f244e78a4 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SplitStoreTest.java @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.util.FileUtils; + +public class SplitStoreTest extends QpidBrokerTestCase +{ + private String _messageStorePath; + private String _configStorePath; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + String virtualHostWorkDir = System.getProperty("QPID_WORK") + File.separator + TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST + File.separator; + _messageStorePath = virtualHostWorkDir + "messageStore"; + _configStorePath = virtualHostWorkDir + "configStore"; + } + + @Override + public void startBroker() throws Exception + { + // Overridden to prevent QBTC starting the Broker. + } + + public void testJsonConfigurationStoreWithPersistentMessageStore() throws Exception + { + Map configurationStoreSettings = new HashMap(); + configurationStoreSettings.put(DurableConfigurationStore.STORE_TYPE, JsonFileConfigStore.TYPE); + configurationStoreSettings.put(DurableConfigurationStore.STORE_PATH, _configStorePath); + + doTest(configurationStoreSettings); + } + + public void testSeparateConfigurationAndMessageStoresOfTheSameType() throws Exception + { + Map configurationStoreSettings = new HashMap(); + configurationStoreSettings.put(DurableConfigurationStore.STORE_TYPE, getTestProfileMessageStoreType()); + configurationStoreSettings.put(DurableConfigurationStore.STORE_PATH, _configStorePath); + + doTest(configurationStoreSettings); + } + + private void configureAndStartBroker(Map configurationStoreSettings) throws Exception + { + Map messageStoreSettings = new HashMap(); + messageStoreSettings.put(MessageStore.STORE_TYPE, getTestProfileMessageStoreType()); + messageStoreSettings.put(MessageStore.STORE_PATH, _messageStorePath); + + TestBrokerConfiguration config = getBrokerConfiguration(); + config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings); + config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.CONFIGURATION_STORE_SETTINGS, configurationStoreSettings); + + super.startBroker(); + } + + private void doTest(Map configurationStoreSettings) throws Exception + { + configureAndStartBroker(configurationStoreSettings); + + Connection connection = getConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(getTestQueueName()); + session.createConsumer(queue).close(); // Create durable queue by side effect + sendMessage(session, queue, 2); + connection.close(); + + restartBroker(); + + setTestSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false"); + connection = getConnection(); + connection.start(); + session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(queue); + Message message = consumer.receive(1000); + session.commit(); + + assertNotNull("Message was not received after first restart", message); + assertEquals("Unexpected message received after first restart", 0, message.getIntProperty(INDEX)); + + stopBroker(); + File messageStoreFile = new File(_messageStorePath); + FileUtils.delete(messageStoreFile, true); + assertFalse("Store folder was not deleted", messageStoreFile.exists()); + super.startBroker(); + + connection = getConnection(); + connection.start(); + session = connection.createSession(true, Session.SESSION_TRANSACTED); + consumer = session.createConsumer(queue); + message = consumer.receive(500); + + assertNull("Message was received after store removal", message); + } + +} -- cgit v1.2.1