diff options
| author | Keith Wall <kwall@apache.org> | 2012-05-24 08:21:29 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-05-24 08:21:29 +0000 |
| commit | 2255b3813bbd5d4557596afc83c4113f16b83b9b (patch) | |
| tree | 381ca89ea671c1d893b7c03de8bcd2f457e24d7f /qpid/java | |
| parent | 96e9893c318cf06297b290e3c9f21fa5eb7efb7c (diff) | |
| download | qpid-python-2255b3813bbd5d4557596afc83c4113f16b83b9b.tar.gz | |
QPID-4006: Remove BDB StateChangeListener during Environment#close() to avoid processing an unnecessary DETACHED event (which was causing a stack trace).
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1342169 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
| -rw-r--r-- | qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java | 181 |
1 files changed, 108 insertions, 73 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java index 0f1349527c..38cee1dd3a 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java @@ -66,11 +66,14 @@ import com.sleepycat.je.rep.util.ReplicationGroupAdmin; public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMessageStore { + private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class); + private static final String MUTLI_SYNC = "MUTLI_SYNC"; private static final String DEFAULT_REPLICATION_POLICY = MUTLI_SYNC + "," + SyncPolicy.WRITE_NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name(); - private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class); + public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort"; + public static final String GRP_MEM_COL_NODE_NAME = "NodeName"; private String _groupName; private String _nodeName; @@ -83,10 +86,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess private BDBHAMessageStoreManagerMBean _managedObject; - public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort"; - - public static final String GRP_MEM_COL_NODE_NAME = "NodeName"; - private CommitThreadWrapper _commitThreadWrapper; private boolean _localMultiSyncCommits; private boolean _autoDesignatedPrimary; @@ -137,16 +136,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess } } - private String getValidatedPropertyFromConfig(String key, Configuration config) throws ConfigurationException - { - if (!config.containsKey(key)) - { - throw new ConfigurationException("BDB HA configuration key not found. Please specify configuration key with XPath: " - + key.replace('.', '/')); - } - return config.getString(key); - } - @Override protected Environment createEnvironment(File environmentPath) throws DatabaseException { @@ -199,50 +188,29 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess } @Override - public synchronized void passivate() + public synchronized void activate() throws Exception { - if (_stateManager.isNotInState(State.INITIALISED)) - { - LOGGER.debug("Store becoming passive"); - _stateManager.attainState(State.INITIALISED); - } - } + // Before proceeding, perform a log flush with an fsync + getEnvironment().flushLog(true); - @Override - protected void closeInternal() throws Exception - { - try - { - if(_localMultiSyncCommits) - { - _commitThreadWrapper.stopCommitThread(); - } - super.closeInternal(); - } - finally + super.activate(); + + //For replica groups with 2 electable nodes, set the new master to be the + //designated primary, such that it can continue working if the replica goes + //down and leaves it without a 'majority of 2'. + if(getReplicatedEnvironment().getGroup().getElectableNodes().size() <= 2 && _autoDesignatedPrimary) { - if (_managedObject != null) - { - _managedObject.unregister(); - } + setDesignatedPrimary(true); } } @Override - protected StoreFuture commit(Transaction tx, boolean syncCommit) throws DatabaseException + public synchronized void passivate() { - // Using commit() instead of commitNoSync() for the HA store to allow - // the HA durability configuration to influence resulting behaviour. - tx.commit(); - - - if(_localMultiSyncCommits) - { - return _commitThreadWrapper.commit(tx, syncCommit); - } - else + if (_stateManager.isNotInState(State.INITIALISED)) { - return StoreFuture.IMMEDIATE_FUTURE; + LOGGER.debug("Store becoming passive"); + _stateManager.attainState(State.INITIALISED); } } @@ -330,7 +298,10 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess replicatedEnvironment.setRepMutableConfig(newConfig); } - LOGGER.info("Node " + _nodeName + " successfully set as designated primary for group"); + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Node " + _nodeName + " successfully set as designated primary for group"); + } } catch (DatabaseException e) { @@ -361,6 +332,61 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess } } + @Override + protected StoreFuture commit(Transaction tx, boolean syncCommit) throws DatabaseException + { + // Using commit() instead of commitNoSync() for the HA store to allow + // the HA durability configuration to influence resulting behaviour. + tx.commit(); + + if(_localMultiSyncCommits) + { + return _commitThreadWrapper.commit(tx, syncCommit); + } + else + { + return StoreFuture.IMMEDIATE_FUTURE; + } + } + + @Override + protected void closeInternal() throws Exception + { + try + { + substituteNoOpStateChangeListenerOn(getReplicatedEnvironment()); + + try + { + if(_localMultiSyncCommits) + { + _commitThreadWrapper.stopCommitThread(); + } + } + finally + { + super.closeInternal(); + } + } + finally + { + if (_managedObject != null) + { + _managedObject.unregister(); + } + } + } + + /** + * Replicas emit a state change event {@link com.sleepycat.je.rep.ReplicatedEnvironment.State#DETACHED} during + * {@link Environment#close()}. We replace the StateChangeListener so we silently ignore this state change. + */ + private void substituteNoOpStateChangeListenerOn(ReplicatedEnvironment replicatedEnvironment) + { + LOGGER.debug("Substituting no-op state change listener for environment close"); + replicatedEnvironment.setStateChangeListener(new NoOpStateChangeListener()); + } + private ReplicationGroupAdmin createReplicationGroupAdmin() { final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>(); @@ -372,6 +398,29 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess return new ReplicationGroupAdmin(_groupName, helpers); } + + private void setReplicationConfigProperties(ReplicationConfig replicationConfig) + { + for (Map.Entry<String, String> configItem : _repConfigMap.entrySet()) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); + } + replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue()); + } + } + + private String getValidatedPropertyFromConfig(String key, Configuration config) throws ConfigurationException + { + if (!config.containsKey(key)) + { + throw new ConfigurationException("BDB HA configuration key not found. Please specify configuration key with XPath: " + + key.replace('.', '/')); + } + return config.getString(key); + } + private class BDBHAMessageStoreStateChangeListener implements StateChangeListener { private final Executor _executor = Executors.newSingleThreadExecutor(); @@ -381,7 +430,10 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess { com.sleepycat.je.rep.ReplicatedEnvironment.State state = stateChangeEvent.getState(); - LOGGER.info("Received BDB event indicating transition to state " + state); + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Received BDB event indicating transition to state " + state); + } switch (state) { @@ -508,29 +560,12 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess } } - @Override - public synchronized void activate() throws Exception - { - // Before proceeding, perform a log flush with an fsync - getEnvironment().flushLog(true); - - super.activate(); - - //For replica groups with 2 electable nodes, set the new master to be the - //designated primary, such that it can continue working if the replica goes - //down and leaves it without a 'majority of 2'. - if(getReplicatedEnvironment().getGroup().getElectableNodes().size() <= 2 && _autoDesignatedPrimary) - { - setDesignatedPrimary(true); - } - } - - private void setReplicationConfigProperties(ReplicationConfig replicationConfig) + private class NoOpStateChangeListener implements StateChangeListener { - for (Map.Entry<String, String> configItem : _repConfigMap.entrySet()) + @Override + public void stateChange(StateChangeEvent stateChangeEvent) + throws RuntimeException { - LOGGER.debug("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); - replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue()); } } } |
