summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/main
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-03-14 16:39:47 +0000
committerKeith Wall <kwall@apache.org>2014-03-14 16:39:47 +0000
commitec486999608568e37a55dc9c81d9be133d95ebc3 (patch)
tree87d6446e97cfdca321b1faff6f24a3010df4cdff /qpid/java/bdbstore/src/main
parentdb26915f9b2edfa410c094162bec78b9d2010b24 (diff)
downloadqpid-python-ec486999608568e37a55dc9c81d9be133d95ebc3.tar.gz
QPID-5624: Introduce messageStoreSettings VH attribute and move all message store related attributes into messageStoreSettings map
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha2@1577606 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/main')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java19
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java50
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java8
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java3
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java10
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java41
6 files changed, 89 insertions, 42 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
index b2ec96f9f8..6fb84b8a4d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
@@ -23,6 +23,8 @@ import java.util.Map;
import org.apache.qpid.server.plugin.VirtualHostFactory;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -52,11 +54,18 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory
@Override
public void validateAttributes(Map<String, Object> attributes)
{
- validateAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH, String.class, attributes);
- validateAttribute("haGroupName", String.class, attributes);
- validateAttribute("haNodeName", String.class, attributes);
- validateAttribute("haNodeAddress", String.class, attributes);
- validateAttribute("haHelperAddress", String.class, attributes);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> messageStoreSettings = (Map<String, Object>)attributes.get(org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS);
+ if (messageStoreSettings == null)
+ {
+ throw new IllegalArgumentException("Attribute '"+ org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS + "' is required.");
+ }
+
+ validateAttribute(MessageStore.STORE_PATH, String.class, messageStoreSettings);
+ validateAttribute(ReplicatedEnvironmentFacadeFactory.GROUP_NAME, String.class, messageStoreSettings);
+ validateAttribute(ReplicatedEnvironmentFacadeFactory.NODE_NAME, String.class, messageStoreSettings);
+ validateAttribute(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, String.class, messageStoreSettings);
+ validateAttribute(ReplicatedEnvironmentFacadeFactory.HELPER_ADDRESS, String.class, messageStoreSettings);
}
private void validateAttribute(String attrName, Class<?> clazz, Map<String, Object> attributes)
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 35dae4b800..c8550b2114 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -20,12 +20,6 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import com.sleepycat.bind.tuple.ByteBinding;
-import com.sleepycat.bind.tuple.IntegerBinding;
-import com.sleepycat.bind.tuple.LongBinding;
-import com.sleepycat.je.*;
-import com.sleepycat.je.Transaction;
-
import java.io.File;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
@@ -36,16 +30,32 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.*;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.EventManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
+import org.apache.qpid.server.store.State;
+import org.apache.qpid.server.store.StateManager;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMemoryMessage;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
import org.apache.qpid.server.store.berkeleydb.entry.Xid;
@@ -59,6 +69,21 @@ import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
import org.apache.qpid.util.FileUtils;
+import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockConflictException;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
+
/**
* BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
*
@@ -72,8 +97,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class);
public static final int VERSION = 7;
- public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig";
-
private static final int LOCK_RETRY_ATTEMPTS = 5;
private static String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS";
private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA";
@@ -119,7 +142,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory)
{
- _type = environmentFacadeFactory.getType();;
+ _type = environmentFacadeFactory.getType();
_environmentFacadeFactory = environmentFacadeFactory;
_stateManager = new StateManager(_eventManager);
}
@@ -218,8 +241,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private void configure(VirtualHost virtualHost, boolean isMessageStore) throws StoreException
{
- Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE);
- Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE);
+ Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
+ Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE);
+ Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE);
_persistentSizeHighThreshold = overfullAttr == null ? -1l :
overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
index 8f2086a25c..e2b30f6740 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
@@ -52,12 +52,14 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi
@Override
public void validateAttributes(Map<String, Object> attributes)
{
- if(getType().equals(attributes.get(VirtualHost.STORE_TYPE)))
+ @SuppressWarnings("unchecked")
+ Map<String, Object> messageStoreSettings = (Map<String, Object>) attributes.get(VirtualHost.MESSAGE_STORE_SETTINGS);
+ if(getType().equals(messageStoreSettings.get(MessageStore.STORE_TYPE)))
{
- Object storePath = attributes.get(VirtualHost.STORE_PATH);
+ Object storePath = messageStoreSettings.get(MessageStore.STORE_PATH);
if(!(storePath instanceof String))
{
- throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH
+ throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_PATH
+"' is required and must be of type String.");
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
index b784e436b9..d242790efb 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
@@ -24,8 +24,9 @@ import org.apache.qpid.server.model.VirtualHost;
public interface EnvironmentFacadeFactory
{
+ public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig";
- EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore);
+ EnvironmentFacade createEnvironmentFacade(VirtualHost<?> virtualHost, boolean isMessageStore);
String getType();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
index 384ceba98a..7fdae6b3ee 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
@@ -26,30 +26,32 @@ import java.util.Map;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory
{
@SuppressWarnings("unchecked")
@Override
- public EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore)
+ public EnvironmentFacade createEnvironmentFacade(VirtualHost<?> virtualHost, boolean isMessageStore)
{
+ String name = virtualHost.getName();
+ Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
Map<String, String> envConfigMap = new HashMap<String, String>();
envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS);
- Object environmentConfigurationAttributes = virtualHost.getAttribute(BDBMessageStore.ENVIRONMENT_CONFIGURATION);
+ Object environmentConfigurationAttributes = messageStoreSettings.get(ENVIRONMENT_CONFIGURATION);
if (environmentConfigurationAttributes instanceof Map)
{
envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes);
}
- String name = virtualHost.getName();
final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + name;
String storeLocation;
if(isMessageStore)
{
- storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+ storeLocation = (String) messageStoreSettings.get(MessageStore.STORE_PATH);
if(storeLocation == null)
{
storeLocation = defaultPath;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
index cd53afe891..4df62b1d0f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb.replication;
import java.util.Map;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory;
@@ -32,47 +33,56 @@ import com.sleepycat.je.Durability.SyncPolicy;
public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
{
-
+ public static final String DURABILITY = "haDurability";
+ public static final String GROUP_NAME = "haGroupName";
+ public static final String HELPER_ADDRESS = "haHelperAddress";
+ public static final String NODE_ADDRESS = "haNodeAddress";
+ public static final String NODE_NAME = "haNodeName";
+ public static final String REPLICATION_CONFIG = "haReplicationConfig";
+ public static final String COALESCING_SYNC = "haCoalescingSync";
+ public static final String DESIGNATED_PRIMARY = "haDesignatedPrimary";
+
private static final int DEFAULT_NODE_PRIORITY = 1;
private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC,
ReplicaAckPolicy.SIMPLE_MAJORITY);
private static final boolean DEFAULT_COALESCING_SYNC = true;
-
-
@Override
- public EnvironmentFacade createEnvironmentFacade(final VirtualHost virtualHost, boolean isMessageStore)
+ public EnvironmentFacade createEnvironmentFacade(VirtualHost<?> virtualHost, boolean isMessageStore)
{
+ final Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
ReplicatedEnvironmentConfiguration configuration = new ReplicatedEnvironmentConfiguration()
{
@Override
public boolean isDesignatedPrimary()
{
- return convertBoolean(virtualHost.getAttribute("haDesignatedPrimary"), false);
+ return convertBoolean(messageStoreSettings.get(DESIGNATED_PRIMARY), false);
}
@Override
public boolean isCoalescingSync()
{
- return convertBoolean(virtualHost.getAttribute("haCoalescingSync"), DEFAULT_COALESCING_SYNC);
+ return convertBoolean(messageStoreSettings.get(COALESCING_SYNC), DEFAULT_COALESCING_SYNC);
}
@Override
public String getStorePath()
{
- return (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+ return (String) messageStoreSettings.get(MessageStore.STORE_PATH);
}
+ @SuppressWarnings("unchecked")
@Override
public Map<String, String> getParameters()
{
- return (Map<String, String>) virtualHost.getAttribute("bdbEnvironmentConfig");
+ return (Map<String, String>) messageStoreSettings.get(EnvironmentFacadeFactory.ENVIRONMENT_CONFIGURATION);
}
+ @SuppressWarnings("unchecked")
@Override
public Map<String, String> getReplicationParameters()
{
- return (Map<String, String>) virtualHost.getAttribute("haReplicationConfig");
+ return (Map<String, String>) messageStoreSettings.get(REPLICATION_CONFIG);
}
@Override
@@ -87,36 +97,35 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
return DEFAULT_NODE_PRIORITY;
}
-
-
@Override
public String getName()
{
- return (String)virtualHost.getAttribute("haNodeName");
+ return (String)messageStoreSettings.get(NODE_NAME);
}
@Override
public String getHostPort()
{
- return (String)virtualHost.getAttribute("haNodeAddress");
+ return (String)messageStoreSettings.get(NODE_ADDRESS);
}
@Override
public String getHelperHostPort()
{
- return (String)virtualHost.getAttribute("haHelperAddress");
+ return (String)messageStoreSettings.get(HELPER_ADDRESS);
}
@Override
public String getGroupName()
{
- return (String)virtualHost.getAttribute("haGroupName");
+ return (String)messageStoreSettings.get(GROUP_NAME);
}
@Override
public String getDurability()
{
- return virtualHost.getAttribute("haDurability") == null ? DEFAULT_DURABILITY.toString() : (String)virtualHost.getAttribute("haDurability");
+ String durability = (String)messageStoreSettings.get(DURABILITY);
+ return durability == null ? DEFAULT_DURABILITY.toString() : durability;
}
};
return new ReplicatedEnvironmentFacade(configuration);