summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-10-21 16:11:49 +0000
committerKeith Wall <kwall@apache.org>2014-10-21 16:11:49 +0000
commitcfb1b1056e35892c04fbdafd486913bba5054587 (patch)
treec045267a61aa7cb2b4299d1ba3275157aec782b6 /qpid/java
parentf9b8cebdef0951eb643e6dbd6a41b3f2a70c5104 (diff)
downloadqpid-python-cfb1b1056e35892c04fbdafd486913bba5054587.tar.gz
QPID-6154: [Java Broker] HA - Handle rollback of node when use of weak durability has allowed nodes to diverge
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1633407 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java62
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java5
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java6
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java93
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailabilityMessages.java29
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailability_logmessages.properties2
6 files changed, 175 insertions, 22 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 8877f6a156..d5425019a8 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -53,27 +53,12 @@ import com.sleepycat.je.Durability.ReplicaAckPolicy;
import com.sleepycat.je.Durability.SyncPolicy;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.EnvironmentFailureException;
+import com.sleepycat.je.ExceptionEvent;
import com.sleepycat.je.Sequence;
import com.sleepycat.je.SequenceConfig;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
-import com.sleepycat.je.rep.AppStateMonitor;
-import com.sleepycat.je.rep.InsufficientAcksException;
-import com.sleepycat.je.rep.InsufficientLogException;
-import com.sleepycat.je.rep.InsufficientReplicasException;
-import com.sleepycat.je.rep.NetworkRestore;
-import com.sleepycat.je.rep.NetworkRestoreConfig;
-import com.sleepycat.je.rep.NodeState;
-import com.sleepycat.je.rep.NodeType;
-import com.sleepycat.je.rep.RepInternal;
-import com.sleepycat.je.rep.ReplicatedEnvironment;
-import com.sleepycat.je.rep.ReplicationConfig;
-import com.sleepycat.je.rep.ReplicationGroup;
-import com.sleepycat.je.rep.ReplicationMutableConfig;
-import com.sleepycat.je.rep.ReplicationNode;
-import com.sleepycat.je.rep.RestartRequiredException;
-import com.sleepycat.je.rep.StateChangeEvent;
-import com.sleepycat.je.rep.StateChangeListener;
+import com.sleepycat.je.rep.*;
import com.sleepycat.je.rep.util.DbPing;
import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
import com.sleepycat.je.rep.utilint.HostPortPair;
@@ -90,7 +75,6 @@ import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
import org.apache.qpid.server.store.berkeleydb.EnvHomeRegistry;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
-import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.DaemonThreadFactory;
@@ -200,6 +184,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private volatile long _joinTime;
private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
private volatile long _envSetupTimeoutMillis;
+ /** Flag set true when JE need to discard transactions in order to rejoin the group */
+ private volatile boolean _nodeRolledback;
public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration)
{
@@ -410,7 +396,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private void tryToRestartEnvironment(final DatabaseException dbe)
{
- if (_state.compareAndSet(State.OPEN, State.RESTARTING))
+ if (_state.compareAndSet(State.OPEN, State.RESTARTING) || _state.compareAndSet(State.OPENING, State.RESTARTING))
{
if (dbe != null && LOGGER.isDebugEnabled())
{
@@ -1100,7 +1086,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
envConfig.setTransactional(true);
- envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
+ envConfig.setExceptionListener(new ExceptionListener());
envConfig.setDurability(_defaultDurability);
for (Map.Entry<String, String> configItem : environmentParameters.entrySet())
@@ -1229,6 +1215,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
if (_replicationGroupListener.compareAndSet(null, replicationGroupListener))
{
notifyExistingRemoteReplicationNodes(replicationGroupListener);
+ notifyNodeRolledbackIfNecessary(replicationGroupListener);
}
else
{
@@ -1459,6 +1446,15 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
+ private void notifyNodeRolledbackIfNecessary(ReplicationGroupListener listener)
+ {
+ if (_nodeRolledback)
+ {
+ listener.onNodeRolledback();
+ _nodeRolledback = false;
+ }
+ }
+
private class RemoteNodeStateLearner implements Callable<Void>
{
private Map<String, ReplicatedEnvironment.State> _previousGroupState = Collections.emptyMap();
@@ -1491,7 +1487,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
executeDatabasePingerOnNodeChangesIfMaster(nodeStates, currentDesignatedPrimary, currentElectableGroupSizeOverride);
- notifyGroupListenerAboutNodeStates(nodeStates);
+ notifyGroupListenerAboutNodeStates(nodeStates);
}
}
}
@@ -1776,4 +1772,28 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
'}';
}
}
+
+ private class ExceptionListener implements com.sleepycat.je.ExceptionListener
+ {
+ @Override
+ public void exceptionThrown(final ExceptionEvent event)
+ {
+ if (event.getException() instanceof RollbackException)
+ {
+ // Usually caused use of weak durability options: node priority zero,
+ // designated primary, electable group override.
+ RollbackException re = (RollbackException) event.getException();
+
+ LOGGER.warn(_prettyGroupNodeName + " has transaction(s) ahead of the current master. These"
+ + " must be discarded to allow this node to rejoin the group."
+ + " This condition is normally caused by the use of weak durability options.");
+ _nodeRolledback = true;
+ tryToRestartEnvironment(re);
+ }
+ else
+ {
+ LOGGER.error("Asynchronous exception thrown by BDB thread '" + event.getThreadName() + "'", event.getException());
+ }
+ }
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java
index cf6050c3cd..896295708a 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java
@@ -55,4 +55,9 @@ public interface ReplicationGroupListener
void onNoMajority();
+ /**
+ * Signifies that node need to discard one or more transactions in order to rejoin the group. Most likely
+ * caused by use of the weak durability options such as node priority zero.
+ */
+ void onNodeRolledback();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index db2e1277c7..39cac8fdc6 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -1108,6 +1108,12 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.QUORUM_LOST());
}
+ @Override
+ public void onNodeRolledback()
+ {
+ getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.NODE_ROLLEDBACK());
+ }
+
private Map<String, Object> nodeToAttributes(ReplicationNode replicationNode)
{
Map<String, Object> attributes = new HashMap<String, Object>();
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index bef5a87217..ed468c0844 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -33,6 +33,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.bind.tuple.StringBinding;
+import com.sleepycat.je.DatabaseEntry;
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -743,6 +746,91 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertTrue("Intruder node was not detected", intruderLatch.await(10, TimeUnit.SECONDS));
}
+ public void testNodeRolledback() throws Exception
+ {
+ DatabaseConfig createConfig = new DatabaseConfig();
+ createConfig.setAllowCreate(true);
+ createConfig.setTransactional(true);
+
+ ReplicatedEnvironmentFacade node1 = createMaster();
+
+ String replicaNodeHostPort = "localhost:" + _portHelper.getNextAvailable();
+
+ String replicaName = TEST_NODE_NAME + 1;
+ ReplicatedEnvironmentFacade node2 = createReplica(replicaName, replicaNodeHostPort, new NoopReplicationGroupListener());
+
+ node1.setDesignatedPrimary(true);
+
+ Transaction txn = node1.beginTransaction();
+ Database db = node1.getEnvironment().openDatabase(txn, "mydb", createConfig);
+ txn.commit();
+
+ // Put a record (that will be replicated)
+ putRecord(node1, db, 1, "value1");
+
+ node2.close();
+
+ // Put a record (that will be only on node1 as node2 is now offline)
+ putRecord(node1, db, 2, "value2");
+
+ db.close();
+
+ // Stop node1
+ node1.close();
+
+ // Restart the node2, making it primary so it becomes master
+ TestStateChangeListener node2StateChangeListener = new TestStateChangeListener(State.MASTER);
+ node2 = addNode(replicaName, replicaNodeHostPort, true, node2StateChangeListener, new NoopReplicationGroupListener());
+ boolean awaitForStateChange = node2StateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS);
+ assertTrue(replicaName + " did not go into desired state; current actual state is "
+ + node2StateChangeListener.getCurrentActualState(), awaitForStateChange);
+
+ txn = node2.beginTransaction();
+ db = node2.getEnvironment().openDatabase(txn, "mydb", DatabaseConfig.DEFAULT);
+ txn.commit();
+
+ // Do a transaction on node2. The two environments will have diverged
+ putRecord(node2, db, 3, "diverged");
+
+ // Now restart node1 and ensure that it realises it needs to rollback before it can rejoin.
+ TestStateChangeListener node1StateChangeListener = new TestStateChangeListener(State.REPLICA);
+ final CountDownLatch _replicaRolledback = new CountDownLatch(1);
+ node1 = addNode(node1StateChangeListener, new NoopReplicationGroupListener()
+ {
+ @Override
+ public void onNodeRolledback()
+ {
+ _replicaRolledback.countDown();
+ }
+ });
+ assertTrue("Node 1 did not go into desired state",
+ node1StateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+ assertTrue("Node 1 did not experience rollback within timeout",
+ _replicaRolledback.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ // Finally do one more transaction through the master
+ putRecord(node2, db, 4, "value4");
+ db.close();
+
+ node1.close();
+ node2.close();
+ }
+
+ private void putRecord(final ReplicatedEnvironmentFacade master, final Database db, final int keyValue,
+ final String dataValue)
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+
+ Transaction txn = master.beginTransaction();
+ IntegerBinding.intToEntry(keyValue, key);
+ StringBinding.stringToEntry(dataValue, data);
+
+ db.put(txn, key, data);
+ txn.commit();
+ }
+
+
private void createIntruder(String nodeName, String node1NodeHostPort)
{
File environmentPathFile = new File(_storePath, nodeName);
@@ -883,5 +971,10 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
{
}
+ @Override
+ public void onNodeRolledback()
+ {
+ }
+
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailabilityMessages.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailabilityMessages.java
index 2234ce6b74..ee29ef6796 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailabilityMessages.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailabilityMessages.java
@@ -57,6 +57,7 @@ public class HighAvailabilityMessages
public static final String DELETED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.deleted";
public static final String ROLE_CHANGED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.role_changed";
public static final String DESIGNATED_PRIMARY_CHANGED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.designated_primary_changed";
+ public static final String NODE_ROLLEDBACK_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.node_rolledback";
static
{
@@ -74,6 +75,7 @@ public class HighAvailabilityMessages
Logger.getLogger(DELETED_LOG_HIERARCHY);
Logger.getLogger(ROLE_CHANGED_LOG_HIERARCHY);
Logger.getLogger(DESIGNATED_PRIMARY_CHANGED_LOG_HIERARCHY);
+ Logger.getLogger(NODE_ROLLEDBACK_LOG_HIERARCHY);
_messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.HighAvailability_logmessages", _currentLocale);
}
@@ -479,6 +481,33 @@ public class HighAvailabilityMessages
};
}
+ /**
+ * Log a HighAvailability message of the Format:
+ * <pre>HA-1014 : Diverged transactions discarded</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage NODE_ROLLEDBACK()
+ {
+ String rawMessage = _messages.getString("NODE_ROLLEDBACK");
+
+ final String message = rawMessage;
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return NODE_ROLLEDBACK_LOG_HIERARCHY;
+ }
+ };
+ }
+
private HighAvailabilityMessages()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailability_logmessages.properties b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailability_logmessages.properties
index 1a6bff5353..15e08bdc5a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailability_logmessages.properties
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailability_logmessages.properties
@@ -61,4 +61,4 @@ PRIORITY_CHANGED = HA-1012 : Priority : {0}
# 0 - new value
DESIGNATED_PRIMARY_CHANGED = HA-1013 : Designated primary : {0}
-
+NODE_ROLLEDBACK = HA-1014 : Diverged transactions discarded