diff options
Diffstat (limited to 'qpid/java')
2 files changed, 82 insertions, 1 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index b88e99c805..e16ed737af 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -165,7 +165,14 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan */ put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min"); - put(ReplicationConfig.CONSISTENCY_POLICY, "TimeConsistencyPolicy(1 s,30 s)"); + /** + * Allow Replica to proceed with transactions regardless of the state of a Replica + * At the moment we do not read or write databases on Replicas. + * Setting consistency policy to NoConsistencyRequiredPolicy + * would allow to create transaction on Replica immediately. + * Any followed write operation would fail with ReplicaWriteException. + */ + put(ReplicationConfig.CONSISTENCY_POLICY, NoConsistencyRequiredPolicy.NAME); }}); public static final String PERMITTED_NODE_LIST = "permittedNodes"; @@ -402,6 +409,14 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return new ConnectionScopedRuntimeException(String.format("Environment '%s' cannot finish JE operation because master is unknown", getNodeName()), dbe); } + if (dbe instanceof ReplicaWriteException || dbe instanceof ReplicaConsistencyException) + { + // Master transited into Detached/Replica but underlying Configured Object has not been notified yet + // and attempted to perform JE operation. + // We need to abort any ongoing JE operation without halting the Broker or VHN/VH + return new ConnectionScopedRuntimeException(String.format("Environment '%s' cannot finish JE operation because node is not master", getNodeName()), dbe); + } + boolean restart = (noMajority || dbe instanceof RestartRequiredException); if (restart) { diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index c47307bdc0..b8c3b493bc 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -32,7 +32,10 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -48,6 +51,7 @@ import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.Transaction; import com.sleepycat.je.rep.NodeState; +import com.sleepycat.je.rep.ReplicaWriteException; import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.ReplicatedEnvironment.State; import com.sleepycat.je.rep.ReplicationConfig; @@ -822,6 +826,68 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase node2.close(); } + public void testReplicaTransactionBeginsImmediately() throws Exception + { + ReplicatedEnvironmentFacade master = createMaster(); + String nodeName2 = TEST_NODE_NAME + "_2"; + String host = "localhost"; + int port = _portHelper.getNextAvailable(); + String node2NodeHostPort = host + ":" + port; + + final ReplicatedEnvironmentFacade replica = createReplica(nodeName2, node2NodeHostPort, new NoopReplicationGroupListener() ); + + // close the master + master.close(); + + // try to create a transaction in a separate thread + // and make sure that transaction is created immediately. + ExecutorService service = Executors.newSingleThreadExecutor(); + try + { + + Future<Transaction> future = service.submit(new Callable<Transaction>(){ + + @Override + public Transaction call() throws Exception + { + return replica.getEnvironment().beginTransaction(null, null); + } + }); + Transaction transaction = future.get(5, TimeUnit.SECONDS); + assertNotNull("Transaction was not created during expected time", transaction); + transaction.abort(); + } + finally + { + service.shutdown(); + } + } + + public void testReplicaWriteExceptionIsConvertedIntoConnectionScopedRuntimeException() throws Exception + { + ReplicatedEnvironmentFacade master = createMaster(); + String nodeName2 = TEST_NODE_NAME + "_2"; + String host = "localhost"; + int port = _portHelper.getNextAvailable(); + String node2NodeHostPort = host + ":" + port; + + final ReplicatedEnvironmentFacade replica = createReplica(nodeName2, node2NodeHostPort, new NoopReplicationGroupListener() ); + + // close the master + master.close(); + + try + { + replica.openDatabase("test", DatabaseConfig.DEFAULT.setAllowCreate(true) ); + fail("Replica write operation should fail"); + } + catch(ReplicaWriteException e) + { + RuntimeException handledException = master.handleDatabaseException("test", e); + assertTrue("Unexpected exception", handledException instanceof ConnectionScopedRuntimeException); + } + } + private void putRecord(final ReplicatedEnvironmentFacade master, final Database db, final int keyValue, final String dataValue) { |
