summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/main
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-06-02 11:11:37 +0000
committerKeith Wall <kwall@apache.org>2012-06-02 11:11:37 +0000
commitffb29d39ebba63de972020920a53c1e98b0c9ef8 (patch)
treebf14cfb21f91b649148a7c0750ca2058f2dfe051 /qpid/java/bdbstore/src/main
parentab2e88eba16f283a7f086de5d856da34784343b3 (diff)
downloadqpid-python-ffb29d39ebba63de972020920a53c1e98b0c9ef8.tar.gz
QPID-4006: Introduce coalescing sync configuration, rename replication policy configuration into durability, restore designated primary configuration and remove auto-designated primary functionality
Applied patch from Oleksandr Rudyy <orudyy@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1345486 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/main')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java62
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java12
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java12
3 files changed, 48 insertions, 38 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
index 81ef6b285e..f8b4319696 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
@@ -69,13 +69,12 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
{
private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class);
- private static final String MUTLI_SYNC = "MUTLI_SYNC";
- private static final String DEFAULT_REPLICATION_POLICY =
- MUTLI_SYNC + "," + SyncPolicy.WRITE_NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name();
+ private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC, ReplicaAckPolicy.SIMPLE_MAJORITY);
public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
+ @SuppressWarnings("serial")
private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
{{
/**
@@ -105,16 +104,15 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
private String _nodeName;
private String _nodeHostPort;
private String _helperHostPort;
- private String _replicationPolicy;
- private Durability _replicationDurability;
+ private Durability _durability;
private String _name;
private BDBHAMessageStoreManagerMBean _managedObject;
private CommitThreadWrapper _commitThreadWrapper;
- private boolean _localMultiSyncCommits;
- private boolean _autoDesignatedPrimary;
+ private boolean _coalescingSync;
+ private boolean _designatedPrimary;
private Map<String, String> _repConfig;
@Override
@@ -128,22 +126,24 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
_name = name;
//Optional configuration
- _replicationPolicy = storeConfig.getString("highAvailability.replicationPolicy", DEFAULT_REPLICATION_POLICY).trim();
- _autoDesignatedPrimary = storeConfig.getBoolean("highAvailability.autoDesignatedPrimary", Boolean.TRUE);
-
- if(_replicationPolicy.startsWith(MUTLI_SYNC))
+ String durabilitySetting = storeConfig.getString("highAvailability.durability");
+ if (durabilitySetting == null)
{
- _replicationDurability = Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, SyncPolicy.WRITE_NO_SYNC.name()));
- _localMultiSyncCommits = true;
+ _durability = DEFAULT_DURABILITY;
}
else
{
- _replicationDurability = Durability.parse(_replicationPolicy);
- _localMultiSyncCommits = false;
+ _durability = Durability.parse(durabilitySetting);
}
-
+ _designatedPrimary = storeConfig.getBoolean("highAvailability.designatedPrimary", Boolean.FALSE);
+ _coalescingSync = storeConfig.getBoolean("highAvailability.coalescingSync", Boolean.TRUE);
_repConfig = getConfigMap(REPCONFIG_DEFAULTS, storeConfig, "repConfig");
+ if (_coalescingSync && _durability.getLocalSync() == SyncPolicy.SYNC)
+ {
+ throw new ConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC
+ + "! Please set highAvailability.coalescingSync to false in store configuration.");
+ }
_managedObject = new BDBHAMessageStoreManagerMBean(this);
_managedObject.register();
@@ -155,7 +155,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
{
super.setupStore(storePath, name);
- if(_localMultiSyncCommits)
+ if(_coalescingSync)
{
_commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment());
_commitThreadWrapper.startCommitThread();
@@ -172,16 +172,19 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
LOGGER.info("Node name " + _nodeName);
LOGGER.info("Node host port " + _nodeHostPort);
LOGGER.info("Helper host port " + _helperHostPort);
- LOGGER.info("Replication policy " + _replicationPolicy);
+ LOGGER.info("Durability " + _durability);
+ LOGGER.info("Coalescing sync " + _coalescingSync);
+ LOGGER.info("Designated primary (applicable to 2 node case only) " + _designatedPrimary);
}
final ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort);
replicationConfig.setHelperHosts(_helperHostPort);
+ replicationConfig.setDesignatedPrimary(_designatedPrimary);
setReplicationConfigProperties(replicationConfig);
final EnvironmentConfig envConfig = createEnvironmentConfig();
- envConfig.setDurability(_replicationDurability);
+ envConfig.setDurability(_durability);
ReplicatedEnvironment replicatedEnvironment = null;
try
@@ -220,14 +223,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
getEnvironment().flushLog(true);
super.activate();
-
- //For replica groups with 2 electable nodes, set the new master to be the
- //designated primary, such that it can continue working if the replica goes
- //down and leaves it without a 'majority of 2'.
- if(getReplicatedEnvironment().getGroup().getElectableNodes().size() <= 2 && _autoDesignatedPrimary)
- {
- setDesignatedPrimary(true);
- }
}
@Override
@@ -265,9 +260,14 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
return _helperHostPort;
}
- public String getReplicationPolicy()
+ public String getDurability()
+ {
+ return _durability.toString();
+ }
+
+ public boolean isCoalescingSync()
{
- return _replicationPolicy;
+ return _coalescingSync;
}
public String getNodeState()
@@ -376,7 +376,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
throw de;
}
- if(_localMultiSyncCommits)
+ if(_coalescingSync)
{
return _commitThreadWrapper.commit(tx, syncCommit);
}
@@ -395,7 +395,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
try
{
- if(_localMultiSyncCommits)
+ if(_coalescingSync)
{
_commitThreadWrapper.stopCommitThread();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java
index 731b7144f9..c2c7bf4c86 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java
@@ -107,11 +107,11 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
}
@Override
- public String getReplicationPolicy() throws IOException, JMException
+ public String getDurability() throws IOException, JMException
{
try
{
- return _store.getReplicationPolicy();
+ return _store.getDurability();
}
catch (RuntimeException e)
{
@@ -120,6 +120,13 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
}
}
+
+ @Override
+ public boolean getCoalescingSync() throws IOException, JMException
+ {
+ return _store.isCoalescingSync();
+ }
+
@Override
public String getNodeState() throws IOException, JMException
{
@@ -204,5 +211,4 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
}
}
-
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java
index 5e45335dad..6499ea04e0 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java
@@ -36,9 +36,10 @@ public interface ManagedBDBHAMessageStore
public static final String ATTR_NODE_NAME = "NodeName";
public static final String ATTR_NODE_HOST_PORT = "NodeHostPort";
public static final String ATTR_HELPER_HOST_PORT = "HelperHostPort";
- public static final String ATTR_REPLICATION_POLICY = "ReplicationPolicy";
+ public static final String ATTR_DURABILITY = "Durability";
public static final String ATTR_NODE_STATE = "NodeState";
public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary";
+ public static final String ATTR_COALESCING_SYNC = "CoalescingSync";
@MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the group")
String getGroupName() throws IOException, JMException;
@@ -55,13 +56,16 @@ public interface ManagedBDBHAMessageStore
@MBeanAttribute(name=ATTR_HELPER_HOST_PORT, description="Host/port used to allow a new node to discover other group members")
String getHelperHostPort() throws IOException, JMException;
- @MBeanAttribute(name=ATTR_REPLICATION_POLICY, description="Replication policy")
- String getReplicationPolicy() throws IOException, JMException;
+ @MBeanAttribute(name=ATTR_DURABILITY, description="Durability")
+ String getDurability() throws IOException, JMException;
@MBeanAttribute(name=ATTR_DESIGNATED_PRIMARY, description="Designated primary flag. Applicable to the two node case.")
boolean getDesignatedPrimary() throws IOException, JMException;
- @MBeanOperation(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not")
+ @MBeanAttribute(name=ATTR_COALESCING_SYNC, description="Coalescing sync flag. Applicable to the master sync policies NO_SYNC and WRITE_NO_SYNC only.")
+ boolean getCoalescingSync() throws IOException, JMException;
+
+ @MBeanAttribute(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not")
TabularData getAllNodesInGroup() throws IOException, JMException;
@MBeanOperation(name="removeNodeFromGroup", description="Remove an existing node from the group")