summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/main
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-05-22 18:18:50 +0000
committerKeith Wall <kwall@apache.org>2012-05-22 18:18:50 +0000
commit5b4f8623bd816668c3526088b1dd474f7573fcc3 (patch)
tree4de57e643be308df7ee9b0768627e1e2d35dbede /qpid/java/bdbstore/src/main
parent31295e25f33cb49f1ad23fb2e75ff252df40471e (diff)
downloadqpid-python-5b4f8623bd816668c3526088b1dd474f7573fcc3.tar.gz
QPID-4006: BDB HA. Make passivation async to avoid holding up the BDB thread. Introduce VirtualHost ERROR state to be used when virtual host is unable to activate or passivate itself completely. Change MULTISYNC mode to use WRITE_NO_SYNC. Stop relying on Monitor nodes to perform some tests.
Work of Robbie Gemmell <robbie@apache.org> and myself. squashme git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1341584 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/main')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java110
1 files changed, 78 insertions, 32 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
index f887c8ce36..0f1349527c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -67,7 +68,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
{
private static final String MUTLI_SYNC = "MUTLI_SYNC";
private static final String DEFAULT_REPLICATION_POLICY =
- MUTLI_SYNC + "," + SyncPolicy.NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name();
+ MUTLI_SYNC + "," + SyncPolicy.WRITE_NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name();
private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class);
@@ -107,7 +108,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
if(_replicationPolicy.startsWith(MUTLI_SYNC))
{
- _replicationDurability = Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, SyncPolicy.SYNC.name()));
+ _replicationDurability = Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, SyncPolicy.WRITE_NO_SYNC.name()));
_localMultiSyncCommits = true;
}
else
@@ -388,11 +389,11 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
activateStoreAsync();
break;
case REPLICA:
- passivateStore();
+ passivateStoreAsync();
break;
case DETACHED:
LOGGER.error("BDB replicated node in detached state, therefore passivating.");
- passivateStore();
+ passivateStoreAsync();
break;
case UNKNOWN:
LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)");
@@ -403,20 +404,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
}
}
- /** synchronously calls passivate. This is acceptable because {@link HAMessageStore#passivate()} is expected to be fast */
- private void passivateStore()
- {
- try
- {
- passivate();
- }
- catch(Exception e)
- {
- LOGGER.error("Unable to passivate", e);
- throw new RuntimeException("Unable to passivate", e);
- }
- }
-
/**
* Calls {@link MessageStore#activate()}.
*
@@ -429,34 +416,93 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
*/
private void activateStoreAsync()
{
+ String threadName = "BDBHANodeActivationThread-" + _name;
+ executeStateChangeAsync(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ try
+ {
+ activate();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to activate on hearing MASTER change event",e);
+ throw e;
+ }
+ return null;
+ }
+ }, threadName);
+ }
+
+ /**
+ * Calls {@link #passivate()}.
+ *
+ * <p/>
+ * This is done a background thread, in line with
+ * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because
+ * passivation due to the effect of state change listeners.
+ */
+ private void passivateStoreAsync()
+ {
+ String threadName = "BDBHANodePassivationThread-" + _name;
+ executeStateChangeAsync(new Callable<Void>()
+ {
+
+ @Override
+ public Void call() throws Exception
+ {
+ try
+ {
+ passivate();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event",e);
+ throw e;
+ }
+ return null;
+ }
+ }, threadName);
+ }
+
+ private void executeStateChangeAsync(final Callable<Void> callable, final String threadName)
+ {
final RootMessageLogger _rootLogger = CurrentActor.get().getRootMessageLogger();
_executor.execute(new Runnable()
{
- private static final String _THREAD_NAME = "BDBHANodeActivationThread";
@Override
public void run()
{
- Thread.currentThread().setName(_THREAD_NAME);
- CurrentActor.set(new AbstractActor(_rootLogger)
+ final String originalThreadName = Thread.currentThread().getName();
+ Thread.currentThread().setName(threadName);
+ try
{
- @Override
- public String getLogMessage()
+ CurrentActor.set(new AbstractActor(_rootLogger)
+ {
+ @Override
+ public String getLogMessage()
+ {
+ return threadName;
+ }
+ });
+
+ try
{
- return _THREAD_NAME;
+ callable.call();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Exception during state change", e);
}
- });
-
- try
- {
- activate();
}
- catch (Exception e)
+ finally
{
- LOGGER.error("Failed to activate on hearing MASTER change event",e);
+ Thread.currentThread().setName(originalThreadName);
}
-
}
});
}