summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-05-24 08:21:29 +0000
committerKeith Wall <kwall@apache.org>2012-05-24 08:21:29 +0000
commit2255b3813bbd5d4557596afc83c4113f16b83b9b (patch)
tree381ca89ea671c1d893b7c03de8bcd2f457e24d7f /qpid/java
parent96e9893c318cf06297b290e3c9f21fa5eb7efb7c (diff)
downloadqpid-python-2255b3813bbd5d4557596afc83c4113f16b83b9b.tar.gz
QPID-4006: Remove BDB StateChangeListener during Environment#close() to avoid processing an unnecessary DETACHED event (which was causing a stack trace).
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1342169 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java181
1 files changed, 108 insertions, 73 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
index 0f1349527c..38cee1dd3a 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
@@ -66,11 +66,14 @@ import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMessageStore
{
+ private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class);
+
private static final String MUTLI_SYNC = "MUTLI_SYNC";
private static final String DEFAULT_REPLICATION_POLICY =
MUTLI_SYNC + "," + SyncPolicy.WRITE_NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name();
- private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class);
+ public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
+ public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
private String _groupName;
private String _nodeName;
@@ -83,10 +86,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
private BDBHAMessageStoreManagerMBean _managedObject;
- public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
-
- public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
-
private CommitThreadWrapper _commitThreadWrapper;
private boolean _localMultiSyncCommits;
private boolean _autoDesignatedPrimary;
@@ -137,16 +136,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
}
}
- private String getValidatedPropertyFromConfig(String key, Configuration config) throws ConfigurationException
- {
- if (!config.containsKey(key))
- {
- throw new ConfigurationException("BDB HA configuration key not found. Please specify configuration key with XPath: "
- + key.replace('.', '/'));
- }
- return config.getString(key);
- }
-
@Override
protected Environment createEnvironment(File environmentPath) throws DatabaseException
{
@@ -199,50 +188,29 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
}
@Override
- public synchronized void passivate()
+ public synchronized void activate() throws Exception
{
- if (_stateManager.isNotInState(State.INITIALISED))
- {
- LOGGER.debug("Store becoming passive");
- _stateManager.attainState(State.INITIALISED);
- }
- }
+ // Before proceeding, perform a log flush with an fsync
+ getEnvironment().flushLog(true);
- @Override
- protected void closeInternal() throws Exception
- {
- try
- {
- if(_localMultiSyncCommits)
- {
- _commitThreadWrapper.stopCommitThread();
- }
- super.closeInternal();
- }
- finally
+ super.activate();
+
+ //For replica groups with 2 electable nodes, set the new master to be the
+ //designated primary, such that it can continue working if the replica goes
+ //down and leaves it without a 'majority of 2'.
+ if(getReplicatedEnvironment().getGroup().getElectableNodes().size() <= 2 && _autoDesignatedPrimary)
{
- if (_managedObject != null)
- {
- _managedObject.unregister();
- }
+ setDesignatedPrimary(true);
}
}
@Override
- protected StoreFuture commit(Transaction tx, boolean syncCommit) throws DatabaseException
+ public synchronized void passivate()
{
- // Using commit() instead of commitNoSync() for the HA store to allow
- // the HA durability configuration to influence resulting behaviour.
- tx.commit();
-
-
- if(_localMultiSyncCommits)
- {
- return _commitThreadWrapper.commit(tx, syncCommit);
- }
- else
+ if (_stateManager.isNotInState(State.INITIALISED))
{
- return StoreFuture.IMMEDIATE_FUTURE;
+ LOGGER.debug("Store becoming passive");
+ _stateManager.attainState(State.INITIALISED);
}
}
@@ -330,7 +298,10 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
replicatedEnvironment.setRepMutableConfig(newConfig);
}
- LOGGER.info("Node " + _nodeName + " successfully set as designated primary for group");
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Node " + _nodeName + " successfully set as designated primary for group");
+ }
}
catch (DatabaseException e)
{
@@ -361,6 +332,61 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
}
}
+ @Override
+ protected StoreFuture commit(Transaction tx, boolean syncCommit) throws DatabaseException
+ {
+ // Using commit() instead of commitNoSync() for the HA store to allow
+ // the HA durability configuration to influence resulting behaviour.
+ tx.commit();
+
+ if(_localMultiSyncCommits)
+ {
+ return _commitThreadWrapper.commit(tx, syncCommit);
+ }
+ else
+ {
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+ }
+
+ @Override
+ protected void closeInternal() throws Exception
+ {
+ try
+ {
+ substituteNoOpStateChangeListenerOn(getReplicatedEnvironment());
+
+ try
+ {
+ if(_localMultiSyncCommits)
+ {
+ _commitThreadWrapper.stopCommitThread();
+ }
+ }
+ finally
+ {
+ super.closeInternal();
+ }
+ }
+ finally
+ {
+ if (_managedObject != null)
+ {
+ _managedObject.unregister();
+ }
+ }
+ }
+
+ /**
+ * Replicas emit a state change event {@link com.sleepycat.je.rep.ReplicatedEnvironment.State#DETACHED} during
+ * {@link Environment#close()}. We replace the StateChangeListener so we silently ignore this state change.
+ */
+ private void substituteNoOpStateChangeListenerOn(ReplicatedEnvironment replicatedEnvironment)
+ {
+ LOGGER.debug("Substituting no-op state change listener for environment close");
+ replicatedEnvironment.setStateChangeListener(new NoOpStateChangeListener());
+ }
+
private ReplicationGroupAdmin createReplicationGroupAdmin()
{
final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
@@ -372,6 +398,29 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
return new ReplicationGroupAdmin(_groupName, helpers);
}
+
+ private void setReplicationConfigProperties(ReplicationConfig replicationConfig)
+ {
+ for (Map.Entry<String, String> configItem : _repConfigMap.entrySet())
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+ }
+ replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+ }
+ }
+
+ private String getValidatedPropertyFromConfig(String key, Configuration config) throws ConfigurationException
+ {
+ if (!config.containsKey(key))
+ {
+ throw new ConfigurationException("BDB HA configuration key not found. Please specify configuration key with XPath: "
+ + key.replace('.', '/'));
+ }
+ return config.getString(key);
+ }
+
private class BDBHAMessageStoreStateChangeListener implements StateChangeListener
{
private final Executor _executor = Executors.newSingleThreadExecutor();
@@ -381,7 +430,10 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
{
com.sleepycat.je.rep.ReplicatedEnvironment.State state = stateChangeEvent.getState();
- LOGGER.info("Received BDB event indicating transition to state " + state);
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Received BDB event indicating transition to state " + state);
+ }
switch (state)
{
@@ -508,29 +560,12 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
}
}
- @Override
- public synchronized void activate() throws Exception
- {
- // Before proceeding, perform a log flush with an fsync
- getEnvironment().flushLog(true);
-
- super.activate();
-
- //For replica groups with 2 electable nodes, set the new master to be the
- //designated primary, such that it can continue working if the replica goes
- //down and leaves it without a 'majority of 2'.
- if(getReplicatedEnvironment().getGroup().getElectableNodes().size() <= 2 && _autoDesignatedPrimary)
- {
- setDesignatedPrimary(true);
- }
- }
-
- private void setReplicationConfigProperties(ReplicationConfig replicationConfig)
+ private class NoOpStateChangeListener implements StateChangeListener
{
- for (Map.Entry<String, String> configItem : _repConfigMap.entrySet())
+ @Override
+ public void stateChange(StateChangeEvent stateChangeEvent)
+ throws RuntimeException
{
- LOGGER.debug("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
- replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue());
}
}
}