diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-07-01 09:56:29 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-07-01 09:56:29 +0000 |
| commit | 2549e2808832606b05383d8383e56d1fafffedee (patch) | |
| tree | 26136053cf5ad8229351948f596c41bbe0d2afb3 /qpid/java/systests/src | |
| parent | 94a44efa32a181bfef063523cb592523d48af392 (diff) | |
| download | qpid-python-2549e2808832606b05383d8383e56d1fafffedee.tar.gz | |
QPID-4970 : [Java Broker] Configure MessageStores based on VirtualHost object not XML Configuration
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1498345 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src')
5 files changed, 143 insertions, 30 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java index 07965cfa95..69efb7e310 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java @@ -23,10 +23,10 @@ package org.apache.qpid.server.store; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.MessageContentSource; +import org.apache.qpid.server.model.VirtualHost; public class QuotaMessageStore extends NullMessageStore { @@ -47,12 +47,27 @@ public class QuotaMessageStore extends NullMessageStore } @Override - public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config) + public void configureConfigStore(String name, + ConfigurationRecoveryHandler recoveryHandler, + VirtualHost virtualHost) throws Exception { - _persistentSizeHighThreshold = config.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, Long.MAX_VALUE); - _persistentSizeLowThreshold = config.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY, - _persistentSizeHighThreshold); + Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); + _persistentSizeHighThreshold = overfullAttr == null + ? Long.MAX_VALUE + : overfullAttr instanceof Number + ? ((Number)overfullAttr).longValue() + : Long.parseLong(overfullAttr.toString()); + + Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); + + _persistentSizeLowThreshold = overfullAttr == null + ? _persistentSizeHighThreshold + : underfullAttr instanceof Number + ? ((Number)underfullAttr).longValue() + : Long.parseLong(underfullAttr.toString()); + + if (_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) { _persistentSizeLowThreshold = _persistentSizeHighThreshold; @@ -62,7 +77,7 @@ public class QuotaMessageStore extends NullMessageStore @Override public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception + TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception { _stateManager.attainState(State.INITIALISED); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index ed76c40717..76250e126a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.server.store; -import org.apache.commons.configuration.Configuration; +import java.util.Collections; +import java.util.Map; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; @@ -29,11 +30,11 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import java.nio.ByteBuffer; import java.util.HashMap; -import java.util.Iterator; public class SlowMessageStore implements MessageStore, DurableConfigurationStore { @@ -51,19 +52,22 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore // ***** MessageStore Interface. public void configureConfigStore(String name, - ConfigurationRecoveryHandler recoveryHandler, - Configuration config) throws Exception + ConfigurationRecoveryHandler recoveryHandler, + VirtualHost virtualHost) throws Exception { _logger.info("Starting SlowMessageStore on Virtualhost:" + name); - Configuration delays = config.subset(DELAYS); + Object delaysAttr = virtualHost.getAttribute("slowMessageStoreDelays"); + + Map delays = (delaysAttr instanceof Map) ? (Map) delaysAttr : Collections.emptyMap(); configureDelays(delays); - String messageStoreClass = config.getString("realStore"); + final Object realStoreAttr = virtualHost.getAttribute("realStore"); + String messageStoreClass = realStoreAttr == null ? null : realStoreAttr.toString(); if (delays.containsKey(DEFAULT_DELAY)) { - _defaultDelay = delays.getLong(DEFAULT_DELAY); + _defaultDelay = Long.parseLong(String.valueOf(delays.get(DEFAULT_DELAY))); } if (messageStoreClass != null) @@ -83,25 +87,23 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore _durableConfigurationStore = (DurableConfigurationStore)o; } } - _durableConfigurationStore.configureConfigStore(name, recoveryHandler, config); + _durableConfigurationStore.configureConfigStore(name, recoveryHandler, virtualHost); } - private void configureDelays(Configuration config) + private void configureDelays(Map<Object, Object> config) { - @SuppressWarnings("unchecked") - Iterator<String> delays = config.getKeys(); - while (delays.hasNext()) + for(Map.Entry<Object, Object> entry : config.entrySet()) { - String key = (String) delays.next(); - if (key.endsWith(PRE)) + String key = String.valueOf(entry.getKey()); + if (key.startsWith(PRE)) { - _preDelays.put(key.substring(0, key.length() - PRE.length() - 1), config.getLong(key)); + _preDelays.put(key.substring(PRE.length()), Long.parseLong(String.valueOf(entry.getValue()))); } - else if (key.endsWith(POST)) + else if (key.startsWith(POST)) { - _postDelays.put(key.substring(0, key.length() - POST.length() - 1), config.getLong(key)); + _postDelays.put(key.substring(POST.length()), Long.parseLong(String.valueOf(entry.getValue()))); } } } @@ -156,10 +158,9 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore public void configureMessageStore(String name, MessageStoreRecoveryHandler messageRecoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler, - Configuration config) throws Exception + TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception { - _realStore.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler, config); + _realStore.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler); } public void close() throws Exception diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java new file mode 100644 index 0000000000..a798e6d50e --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java @@ -0,0 +1,78 @@ +package org.apache.qpid.server.store;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.plugin.MessageStoreFactory; + +public class SlowMessageStoreFactory implements MessageStoreFactory +{ + @Override + public String getType() + { + return "SLOW"; + } + + @Override + public MessageStore createMessageStore() + { + return new SlowMessageStore(); + } + + @Override + public Map<String, Object> convertStoreConfiguration(Configuration storeConfiguration) + { + Map<String, Object> convertedMap = new HashMap<String, Object>(); + Configuration delaysConfig = storeConfiguration.subset("delays"); + + @SuppressWarnings("unchecked") + Iterator<String> delays = delaysConfig.getKeys(); + + Map<String,Long> delaysMap = new HashMap<String, Long>(); + + while (delays.hasNext()) + { + String key = delays.next(); + + if (key.endsWith("pre")) + { + delaysMap.put("pre"+key.substring(0, key.length() - 4), delaysConfig.getLong(key)); + } + else if (key.endsWith("post")) + { + delaysMap.put("post"+key.substring(0, key.length() - 5), delaysConfig.getLong(key)); + } + } + + if(!delaysMap.isEmpty()) + { + convertedMap.put("slowMessageStoreDelays",delaysMap); + } + + + convertedMap.put("realStore", storeConfiguration.getString("realStore", null)); + + + return convertedMap; + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java index 4a81480671..182cd5ff0c 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java @@ -55,10 +55,10 @@ public class SyncWaitDelayTest extends QpidBrokerTestCase public void setUp() throws Exception { - setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".type", - StandardVirtualHostFactory.TYPE); - setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.class", "org.apache.qpid.server.store.SlowMessageStore"); - setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.delays.commitTran.post", String.valueOf(POST_COMMIT_DELAY)); + final String prefix = "virtualhosts.virtualhost." + VIRTUALHOST; + setVirtualHostConfigurationProperty(prefix + ".type", StandardVirtualHostFactory.TYPE); + setVirtualHostConfigurationProperty(prefix + ".store.class", "org.apache.qpid.server.store.SlowMessageStore"); + setVirtualHostConfigurationProperty(prefix + ".store.delays.commitTran.post", String.valueOf(POST_COMMIT_DELAY)); super.setUp(); diff --git a/qpid/java/systests/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/systests/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory new file mode 100644 index 0000000000..fdd7a904c3 --- /dev/null +++ b/qpid/java/systests/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.store.SlowMessageStoreFactory |
