summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-03-25 10:07:21 +0000
committerKeith Wall <kwall@apache.org>2014-03-25 10:07:21 +0000
commitfcc3f654b60b7dd2180afe73e8809545725b41af (patch)
treeef5f631412fdbfd5a6ff4232782e90692e3767c1 /qpid/java/systests/src
parentb0e9d446fd5edc23267e2aa924a703749bdb95df (diff)
downloadqpid-python-fcc3f654b60b7dd2180afe73e8809545725b41af.tar.gz
Introduction of separate lifecycle methods on stores for open/close/recover.
Change Upgraders responsibility to create the amqp standard exchanges when upgrading from earlier store versions. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha2@1581288 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java22
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java125
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SplitStoreTest.java130
3 files changed, 208 insertions, 69 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
index 7017ea6d45..06b7091a47 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.model.VirtualHost;
public class
QuotaMessageStore extends NullMessageStore
@@ -49,9 +48,15 @@ public class
}
@Override
- public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler)
+ public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings)
{
- Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
+
+ }
+
+ @Override
+ public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings)
+ {
+ _stateManager.attainState(State.INITIALISING);
Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE);
_persistentSizeHighThreshold = overfullAttr == null
? Long.MAX_VALUE
@@ -72,18 +77,11 @@ public class
{
_persistentSizeLowThreshold = _persistentSizeHighThreshold;
}
- _stateManager.attainState(State.INITIALISING);
- }
-
- @Override
- public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler)
- {
_stateManager.attainState(State.INITIALISED);
}
@Override
- public void activate()
+ public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
{
_stateManager.attainState(State.ACTIVATING);
_stateManager.attainState(State.ACTIVE);
@@ -153,7 +151,7 @@ public class
}
@Override
- public void close()
+ public void closeMessageStore()
{
if (_closed.compareAndSet(false, true))
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index 84d1a9cb1c..44a5252355 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -29,7 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory;
import org.apache.qpid.server.plugin.MessageStoreFactory;
public class SlowMessageStore implements MessageStore, DurableConfigurationStore
@@ -47,51 +47,26 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
private HashMap<String, Long> _preDelays = new HashMap<String, Long>();
private HashMap<String, Long> _postDelays = new HashMap<String, Long>();
private long _defaultDelay = 0L;
- private MessageStore _realStore = null;
- private DurableConfigurationStore _durableConfigurationStore = null;
+ private MessageStore _realMessageStore = null;
+ private DurableConfigurationStore _realDurableConfigurationStore = null;
private Map<EventListener, Event[]> _eventListeners = new ConcurrentHashMap<EventListener, Event[]>();
- // ***** MessageStore Interface.
-
@Override
- public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler)
+ public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings)
{
- _logger.info("Starting SlowMessageStore on Virtualhost:" + virtualHost.getName());
-
- Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
- Object delaysAttr = messageStoreSettings.get(DELAYS);
-
- @SuppressWarnings({ "unchecked" })
- Map<String,Object> delays = (delaysAttr instanceof Map) ? (Map<String,Object>) delaysAttr : Collections.<String,Object>emptyMap();
- configureDelays(delays);
-
- final Object realStoreAttr = messageStoreSettings.get(REAL_STORE);
- String messageStoreType = realStoreAttr == null ? MemoryMessageStore.TYPE : realStoreAttr.toString();
-
- if (delays.containsKey(DEFAULT_DELAY))
- {
- _defaultDelay = Long.parseLong(String.valueOf(delays.get(DEFAULT_DELAY)));
- }
-
- _realStore = MessageStoreFactory.FACTORY_LOADER.get(messageStoreType).createMessageStore();
-
- if (!_eventListeners.isEmpty())
- {
- for (Iterator<Map.Entry<EventListener, Event[]>> it = _eventListeners.entrySet().iterator(); it.hasNext();)
- {
- Map.Entry<EventListener, Event[]> entry = it.next();
- _realStore.addEventListener(entry.getKey(), entry.getValue());
- it.remove();
- }
- }
-
- if (_realStore instanceof DurableConfigurationStore)
+ if (storeSettings != null && storeSettings.get(REAL_STORE) != null)
{
- _durableConfigurationStore = (DurableConfigurationStore)_realStore;
- _durableConfigurationStore.configureConfigStore(virtualHost, recoveryHandler);
+ final String realStore = (String) storeSettings.get(REAL_STORE);
+ _realDurableConfigurationStore = new DurableConfigurationStoreCreator().createMessageStore(realStore);
+ _realDurableConfigurationStore.openConfigurationStore(virtualHostName, storeSettings);
}
+ }
+ @Override
+ public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
+ {
+ _realDurableConfigurationStore.recoverConfigurationStore(recoveryHandler);
}
private void configureDelays(Map<String, Object> delays)
@@ -159,31 +134,67 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
}
@Override
- public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler)
+ public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings)
{
- _realStore.configureMessageStore(virtualHost, messageRecoveryHandler, tlogRecoveryHandler);
+ Object delaysAttr = messageStoreSettings.get(DELAYS);
+
+ @SuppressWarnings({ "unchecked" })
+ Map<String,Object> delays = (delaysAttr instanceof Map) ? (Map<String,Object>) delaysAttr : Collections.<String,Object>emptyMap();
+ configureDelays(delays);
+
+ if (delays.containsKey(DEFAULT_DELAY))
+ {
+ _defaultDelay = Long.parseLong(String.valueOf(delays.get(DEFAULT_DELAY)));
+ }
+
+ final Object realStoreAttr = messageStoreSettings.get(REAL_STORE) == null ? MemoryMessageStore.TYPE : messageStoreSettings.get(REAL_STORE);
+ final String realStore = (String) realStoreAttr;
+ _realMessageStore = MessageStoreFactory.FACTORY_LOADER.get(realStore).createMessageStore();
+
+ if (!_eventListeners.isEmpty())
+ {
+ for (Iterator<Map.Entry<EventListener, Event[]>> it = _eventListeners.entrySet().iterator(); it.hasNext();)
+ {
+ Map.Entry<EventListener, Event[]> entry = it.next();
+ _realMessageStore.addEventListener(entry.getKey(), entry.getValue());
+ it.remove();
+ }
+ }
+ _realMessageStore.openMessageStore(virtualHostName, messageStoreSettings);
+
+ if (_realDurableConfigurationStore == null)
+ {
+ _realDurableConfigurationStore = (DurableConfigurationStore) _realMessageStore;
+ }
+
}
@Override
- public void close()
+ public void closeMessageStore()
{
doPreDelay("close");
- _realStore.close();
+ _realMessageStore.closeMessageStore();
doPostDelay("close");
}
@Override
+ public void closeConfigurationStore()
+ {
+ _realDurableConfigurationStore.closeConfigurationStore();
+ }
+
+
+ @Override
public <M extends StorableMessageMetaData> StoredMessage<M> addMessage(M metaData)
{
- return _realStore.addMessage(metaData);
+ return _realMessageStore.addMessage(metaData);
}
@Override
public void create(UUID id, String type, Map<String, Object> attributes) throws StoreException
{
doPreDelay("create");
- _durableConfigurationStore.create(id, type, attributes);
+ _realDurableConfigurationStore.create(id, type, attributes);
doPostDelay("create");
}
@@ -191,7 +202,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
public void remove(UUID id, String type) throws StoreException
{
doPreDelay("remove");
- _durableConfigurationStore.remove(id, type);
+ _realDurableConfigurationStore.remove(id, type);
doPostDelay("remove");
}
@@ -199,7 +210,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
public UUID[] removeConfiguredObjects(final UUID... objects) throws StoreException
{
doPreDelay("remove");
- UUID[] removed = _durableConfigurationStore.removeConfiguredObjects(objects);
+ UUID[] removed = _realDurableConfigurationStore.removeConfiguredObjects(objects);
doPostDelay("remove");
return removed;
}
@@ -208,7 +219,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
public void update(UUID id, String type, Map<String, Object> attributes) throws StoreException
{
doPreDelay("update");
- _durableConfigurationStore.update(id, type, attributes);
+ _realDurableConfigurationStore.update(id, type, attributes);
doPostDelay("update");
}
@@ -216,7 +227,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
{
doPreDelay("update");
- _durableConfigurationStore.update(createIfNecessary, records);
+ _realDurableConfigurationStore.update(createIfNecessary, records);
doPostDelay("update");
}
@@ -224,7 +235,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
public Transaction newTransaction()
{
doPreDelay("beginTran");
- Transaction txn = new SlowTransaction(_realStore.newTransaction());
+ Transaction txn = new SlowTransaction(_realMessageStore.newTransaction());
doPostDelay("beginTran");
return txn;
}
@@ -232,7 +243,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
@Override
public boolean isPersistent()
{
- return _realStore.isPersistent();
+ return _realMessageStore.isPersistent();
}
private class SlowTransaction implements Transaction
@@ -299,28 +310,28 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
}
@Override
- public void activate()
+ public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
{
- _realStore.activate();
+ _realMessageStore.recoverMessageStore(messageRecoveryHandler, transactionLogRecoveryHandler);
}
@Override
public void addEventListener(EventListener eventListener, Event... events)
{
- if (_realStore == null)
+ if (_realMessageStore == null)
{
_eventListeners .put(eventListener, events);
}
else
{
- _realStore.addEventListener(eventListener, events);
+ _realMessageStore.addEventListener(eventListener, events);
}
}
@Override
public String getStoreLocation()
{
- return _realStore.getStoreLocation();
+ return _realMessageStore.getStoreLocation();
}
@Override
@@ -332,7 +343,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
@Override
public void onDelete()
{
- _realStore.onDelete();
+ _realMessageStore.onDelete();
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SplitStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SplitStoreTest.java
new file mode 100644
index 0000000000..9f244e78a4
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SplitStoreTest.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.apache.qpid.util.FileUtils;
+
+public class SplitStoreTest extends QpidBrokerTestCase
+{
+ private String _messageStorePath;
+ private String _configStorePath;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ String virtualHostWorkDir = System.getProperty("QPID_WORK") + File.separator + TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST + File.separator;
+ _messageStorePath = virtualHostWorkDir + "messageStore";
+ _configStorePath = virtualHostWorkDir + "configStore";
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Overridden to prevent QBTC starting the Broker.
+ }
+
+ public void testJsonConfigurationStoreWithPersistentMessageStore() throws Exception
+ {
+ Map<String, Object> configurationStoreSettings = new HashMap<String, Object>();
+ configurationStoreSettings.put(DurableConfigurationStore.STORE_TYPE, JsonFileConfigStore.TYPE);
+ configurationStoreSettings.put(DurableConfigurationStore.STORE_PATH, _configStorePath);
+
+ doTest(configurationStoreSettings);
+ }
+
+ public void testSeparateConfigurationAndMessageStoresOfTheSameType() throws Exception
+ {
+ Map<String, Object> configurationStoreSettings = new HashMap<String, Object>();
+ configurationStoreSettings.put(DurableConfigurationStore.STORE_TYPE, getTestProfileMessageStoreType());
+ configurationStoreSettings.put(DurableConfigurationStore.STORE_PATH, _configStorePath);
+
+ doTest(configurationStoreSettings);
+ }
+
+ private void configureAndStartBroker(Map<String, Object> configurationStoreSettings) throws Exception
+ {
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_TYPE, getTestProfileMessageStoreType());
+ messageStoreSettings.put(MessageStore.STORE_PATH, _messageStorePath);
+
+ TestBrokerConfiguration config = getBrokerConfiguration();
+ config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings);
+ config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.CONFIGURATION_STORE_SETTINGS, configurationStoreSettings);
+
+ super.startBroker();
+ }
+
+ private void doTest(Map<String, Object> configurationStoreSettings) throws Exception
+ {
+ configureAndStartBroker(configurationStoreSettings);
+
+ Connection connection = getConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(getTestQueueName());
+ session.createConsumer(queue).close(); // Create durable queue by side effect
+ sendMessage(session, queue, 2);
+ connection.close();
+
+ restartBroker();
+
+ setTestSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false");
+ connection = getConnection();
+ connection.start();
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ session.commit();
+
+ assertNotNull("Message was not received after first restart", message);
+ assertEquals("Unexpected message received after first restart", 0, message.getIntProperty(INDEX));
+
+ stopBroker();
+ File messageStoreFile = new File(_messageStorePath);
+ FileUtils.delete(messageStoreFile, true);
+ assertFalse("Store folder was not deleted", messageStoreFile.exists());
+ super.startBroker();
+
+ connection = getConnection();
+ connection.start();
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ consumer = session.createConsumer(queue);
+ message = consumer.receive(500);
+
+ assertNull("Message was received after store removal", message);
+ }
+
+}