summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java44
1 files changed, 25 insertions, 19 deletions
diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
index 03d5e72ea3..1e2169d3a2 100644
--- a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
+++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
@@ -35,7 +35,6 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
-import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import com.sleepycat.je.Durability;
@@ -272,17 +271,19 @@ public class MultiNodeTest extends QpidBrokerTestCase
public void testTransferMasterWhilstMessagesInFlight() throws Exception
{
final Connection connection = getConnection(_positiveFailoverUrl);
- connection.start();
((AMQConnection) connection).setConnectionListener(_failoverListener);
- final AtomicBoolean masterTransfered = new AtomicBoolean(false);
+ final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ final Destination destination = session.createQueue(getTestQueueName());
+ session.createConsumer(destination).close();
+
+ final AtomicBoolean masterTransferred = new AtomicBoolean(false);
final AtomicBoolean keepRunning = new AtomicBoolean(true);
final AtomicReference<Exception> workerException = new AtomicReference<>();
final CountDownLatch producedOneBefore = new CountDownLatch(1);
final CountDownLatch producedOneAfter = new CountDownLatch(1);
final CountDownLatch workerShutdown = new CountDownLatch(1);
-
Runnable producer = new Runnable()
{
@Override
@@ -291,10 +292,7 @@ public class MultiNodeTest extends QpidBrokerTestCase
try
{
int count = 0;
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Destination destination = session.createQueue(getTestQueueName());
MessageProducer producer = session.createProducer(destination);
- session.createConsumer(destination).close();
while (keepRunning.get())
{
@@ -305,23 +303,27 @@ public class MultiNodeTest extends QpidBrokerTestCase
producer.send(message);
session.commit();
LOGGER.debug("Sent message " + count);
+
+ producedOneBefore.countDown();
+
+ if (masterTransferred.get())
+ {
+ producedOneAfter.countDown();
+ }
+ count++;
}
- catch (TransactionRolledBackException trbe)
+ catch (javax.jms.IllegalStateException ise)
{
-
+ throw ise;
}
- catch(JMSException je)
+ catch (TransactionRolledBackException trbe)
{
-
+ // Pass - failover in prgoress
}
-
- producedOneBefore.countDown();
-
- if (masterTransfered.get())
+ catch(JMSException je)
{
- producedOneAfter.countDown();
+ // Pass - failover in progress
}
- count++;
}
}
catch (Exception e)
@@ -353,16 +355,18 @@ public class MultiNodeTest extends QpidBrokerTestCase
assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
_groupCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
- masterTransfered.set(true);
_failoverListener.awaitFailoverCompletion(20000);
LOGGER.info("Failover has finished");
attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort);
- assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
+ assertEquals("New master has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
_groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA");
+ LOGGER.info("Master transfer known to have completed successfully.");
+ masterTransferred.set(true);
+
boolean producedMore = producedOneAfter.await(5000, TimeUnit.MILLISECONDS);
assertTrue("Should have successfully produced at least one message after transfer complete", producedMore);
@@ -373,6 +377,8 @@ public class MultiNodeTest extends QpidBrokerTestCase
backgroundWorker.join(5000);
assertNull(workerException.get());
+ assertNotNull(session.createTemporaryQueue());
+
}
public void testQuorumOverride() throws Exception