summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java17
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java66
2 files changed, 82 insertions, 1 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index b88e99c805..e16ed737af 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -165,7 +165,14 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
*/
put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
- put(ReplicationConfig.CONSISTENCY_POLICY, "TimeConsistencyPolicy(1 s,30 s)");
+ /**
+ * Allow Replica to proceed with transactions regardless of the state of a Replica
+ * At the moment we do not read or write databases on Replicas.
+ * Setting consistency policy to NoConsistencyRequiredPolicy
+ * would allow to create transaction on Replica immediately.
+ * Any followed write operation would fail with ReplicaWriteException.
+ */
+ put(ReplicationConfig.CONSISTENCY_POLICY, NoConsistencyRequiredPolicy.NAME);
}});
public static final String PERMITTED_NODE_LIST = "permittedNodes";
@@ -402,6 +409,14 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return new ConnectionScopedRuntimeException(String.format("Environment '%s' cannot finish JE operation because master is unknown", getNodeName()), dbe);
}
+ if (dbe instanceof ReplicaWriteException || dbe instanceof ReplicaConsistencyException)
+ {
+ // Master transited into Detached/Replica but underlying Configured Object has not been notified yet
+ // and attempted to perform JE operation.
+ // We need to abort any ongoing JE operation without halting the Broker or VHN/VH
+ return new ConnectionScopedRuntimeException(String.format("Environment '%s' cannot finish JE operation because node is not master", getNodeName()), dbe);
+ }
+
boolean restart = (noMajority || dbe instanceof RestartRequiredException);
if (restart)
{
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index c47307bdc0..b8c3b493bc 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -32,7 +32,10 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -48,6 +51,7 @@ import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.rep.NodeState;
+import com.sleepycat.je.rep.ReplicaWriteException;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicatedEnvironment.State;
import com.sleepycat.je.rep.ReplicationConfig;
@@ -822,6 +826,68 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
node2.close();
}
+ public void testReplicaTransactionBeginsImmediately() throws Exception
+ {
+ ReplicatedEnvironmentFacade master = createMaster();
+ String nodeName2 = TEST_NODE_NAME + "_2";
+ String host = "localhost";
+ int port = _portHelper.getNextAvailable();
+ String node2NodeHostPort = host + ":" + port;
+
+ final ReplicatedEnvironmentFacade replica = createReplica(nodeName2, node2NodeHostPort, new NoopReplicationGroupListener() );
+
+ // close the master
+ master.close();
+
+ // try to create a transaction in a separate thread
+ // and make sure that transaction is created immediately.
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ try
+ {
+
+ Future<Transaction> future = service.submit(new Callable<Transaction>(){
+
+ @Override
+ public Transaction call() throws Exception
+ {
+ return replica.getEnvironment().beginTransaction(null, null);
+ }
+ });
+ Transaction transaction = future.get(5, TimeUnit.SECONDS);
+ assertNotNull("Transaction was not created during expected time", transaction);
+ transaction.abort();
+ }
+ finally
+ {
+ service.shutdown();
+ }
+ }
+
+ public void testReplicaWriteExceptionIsConvertedIntoConnectionScopedRuntimeException() throws Exception
+ {
+ ReplicatedEnvironmentFacade master = createMaster();
+ String nodeName2 = TEST_NODE_NAME + "_2";
+ String host = "localhost";
+ int port = _portHelper.getNextAvailable();
+ String node2NodeHostPort = host + ":" + port;
+
+ final ReplicatedEnvironmentFacade replica = createReplica(nodeName2, node2NodeHostPort, new NoopReplicationGroupListener() );
+
+ // close the master
+ master.close();
+
+ try
+ {
+ replica.openDatabase("test", DatabaseConfig.DEFAULT.setAllowCreate(true) );
+ fail("Replica write operation should fail");
+ }
+ catch(ReplicaWriteException e)
+ {
+ RuntimeException handledException = master.handleDatabaseException("test", e);
+ assertTrue("Unexpected exception", handledException instanceof ConnectionScopedRuntimeException);
+ }
+ }
+
private void putRecord(final ReplicatedEnvironmentFacade master, final Database db, final int keyValue,
final String dataValue)
{