diff options
| author | Keith Wall <kwall@apache.org> | 2012-06-02 11:11:37 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-06-02 11:11:37 +0000 |
| commit | ffb29d39ebba63de972020920a53c1e98b0c9ef8 (patch) | |
| tree | bf14cfb21f91b649148a7c0750ca2058f2dfe051 /qpid/java/bdbstore/src/main | |
| parent | ab2e88eba16f283a7f086de5d856da34784343b3 (diff) | |
| download | qpid-python-ffb29d39ebba63de972020920a53c1e98b0c9ef8.tar.gz | |
QPID-4006: Introduce coalescing sync configuration, rename replication policy configuration into durability, restore designated primary configuration and remove auto-designated primary functionality
Applied patch from Oleksandr Rudyy <orudyy@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1345486 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/main')
3 files changed, 48 insertions, 38 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java index 81ef6b285e..f8b4319696 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java @@ -69,13 +69,12 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess { private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class); - private static final String MUTLI_SYNC = "MUTLI_SYNC"; - private static final String DEFAULT_REPLICATION_POLICY = - MUTLI_SYNC + "," + SyncPolicy.WRITE_NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name(); + private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC, ReplicaAckPolicy.SIMPLE_MAJORITY); public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort"; public static final String GRP_MEM_COL_NODE_NAME = "NodeName"; + @SuppressWarnings("serial") private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() {{ /** @@ -105,16 +104,15 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess private String _nodeName; private String _nodeHostPort; private String _helperHostPort; - private String _replicationPolicy; - private Durability _replicationDurability; + private Durability _durability; private String _name; private BDBHAMessageStoreManagerMBean _managedObject; private CommitThreadWrapper _commitThreadWrapper; - private boolean _localMultiSyncCommits; - private boolean _autoDesignatedPrimary; + private boolean _coalescingSync; + private boolean _designatedPrimary; private Map<String, String> _repConfig; @Override @@ -128,22 +126,24 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess _name = name; //Optional configuration - _replicationPolicy = storeConfig.getString("highAvailability.replicationPolicy", DEFAULT_REPLICATION_POLICY).trim(); - _autoDesignatedPrimary = storeConfig.getBoolean("highAvailability.autoDesignatedPrimary", Boolean.TRUE); - - if(_replicationPolicy.startsWith(MUTLI_SYNC)) + String durabilitySetting = storeConfig.getString("highAvailability.durability"); + if (durabilitySetting == null) { - _replicationDurability = Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, SyncPolicy.WRITE_NO_SYNC.name())); - _localMultiSyncCommits = true; + _durability = DEFAULT_DURABILITY; } else { - _replicationDurability = Durability.parse(_replicationPolicy); - _localMultiSyncCommits = false; + _durability = Durability.parse(durabilitySetting); } - + _designatedPrimary = storeConfig.getBoolean("highAvailability.designatedPrimary", Boolean.FALSE); + _coalescingSync = storeConfig.getBoolean("highAvailability.coalescingSync", Boolean.TRUE); _repConfig = getConfigMap(REPCONFIG_DEFAULTS, storeConfig, "repConfig"); + if (_coalescingSync && _durability.getLocalSync() == SyncPolicy.SYNC) + { + throw new ConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC + + "! Please set highAvailability.coalescingSync to false in store configuration."); + } _managedObject = new BDBHAMessageStoreManagerMBean(this); _managedObject.register(); @@ -155,7 +155,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess { super.setupStore(storePath, name); - if(_localMultiSyncCommits) + if(_coalescingSync) { _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment()); _commitThreadWrapper.startCommitThread(); @@ -172,16 +172,19 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess LOGGER.info("Node name " + _nodeName); LOGGER.info("Node host port " + _nodeHostPort); LOGGER.info("Helper host port " + _helperHostPort); - LOGGER.info("Replication policy " + _replicationPolicy); + LOGGER.info("Durability " + _durability); + LOGGER.info("Coalescing sync " + _coalescingSync); + LOGGER.info("Designated primary (applicable to 2 node case only) " + _designatedPrimary); } final ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort); replicationConfig.setHelperHosts(_helperHostPort); + replicationConfig.setDesignatedPrimary(_designatedPrimary); setReplicationConfigProperties(replicationConfig); final EnvironmentConfig envConfig = createEnvironmentConfig(); - envConfig.setDurability(_replicationDurability); + envConfig.setDurability(_durability); ReplicatedEnvironment replicatedEnvironment = null; try @@ -220,14 +223,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess getEnvironment().flushLog(true); super.activate(); - - //For replica groups with 2 electable nodes, set the new master to be the - //designated primary, such that it can continue working if the replica goes - //down and leaves it without a 'majority of 2'. - if(getReplicatedEnvironment().getGroup().getElectableNodes().size() <= 2 && _autoDesignatedPrimary) - { - setDesignatedPrimary(true); - } } @Override @@ -265,9 +260,14 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess return _helperHostPort; } - public String getReplicationPolicy() + public String getDurability() + { + return _durability.toString(); + } + + public boolean isCoalescingSync() { - return _replicationPolicy; + return _coalescingSync; } public String getNodeState() @@ -376,7 +376,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess throw de; } - if(_localMultiSyncCommits) + if(_coalescingSync) { return _commitThreadWrapper.commit(tx, syncCommit); } @@ -395,7 +395,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess try { - if(_localMultiSyncCommits) + if(_coalescingSync) { _commitThreadWrapper.stopCommitThread(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java index 731b7144f9..c2c7bf4c86 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java @@ -107,11 +107,11 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M } @Override - public String getReplicationPolicy() throws IOException, JMException + public String getDurability() throws IOException, JMException { try { - return _store.getReplicationPolicy(); + return _store.getDurability(); } catch (RuntimeException e) { @@ -120,6 +120,13 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M } } + + @Override + public boolean getCoalescingSync() throws IOException, JMException + { + return _store.isCoalescingSync(); + } + @Override public String getNodeState() throws IOException, JMException { @@ -204,5 +211,4 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M } } - } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java index 5e45335dad..6499ea04e0 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java @@ -36,9 +36,10 @@ public interface ManagedBDBHAMessageStore public static final String ATTR_NODE_NAME = "NodeName"; public static final String ATTR_NODE_HOST_PORT = "NodeHostPort"; public static final String ATTR_HELPER_HOST_PORT = "HelperHostPort"; - public static final String ATTR_REPLICATION_POLICY = "ReplicationPolicy"; + public static final String ATTR_DURABILITY = "Durability"; public static final String ATTR_NODE_STATE = "NodeState"; public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary"; + public static final String ATTR_COALESCING_SYNC = "CoalescingSync"; @MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the group") String getGroupName() throws IOException, JMException; @@ -55,13 +56,16 @@ public interface ManagedBDBHAMessageStore @MBeanAttribute(name=ATTR_HELPER_HOST_PORT, description="Host/port used to allow a new node to discover other group members") String getHelperHostPort() throws IOException, JMException; - @MBeanAttribute(name=ATTR_REPLICATION_POLICY, description="Replication policy") - String getReplicationPolicy() throws IOException, JMException; + @MBeanAttribute(name=ATTR_DURABILITY, description="Durability") + String getDurability() throws IOException, JMException; @MBeanAttribute(name=ATTR_DESIGNATED_PRIMARY, description="Designated primary flag. Applicable to the two node case.") boolean getDesignatedPrimary() throws IOException, JMException; - @MBeanOperation(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not") + @MBeanAttribute(name=ATTR_COALESCING_SYNC, description="Coalescing sync flag. Applicable to the master sync policies NO_SYNC and WRITE_NO_SYNC only.") + boolean getCoalescingSync() throws IOException, JMException; + + @MBeanAttribute(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not") TabularData getAllNodesInGroup() throws IOException, JMException; @MBeanOperation(name="removeNodeFromGroup", description="Remove an existing node from the group") |
