diff options
| author | Keith Wall <kwall@apache.org> | 2014-10-29 09:26:37 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-10-29 09:26:37 +0000 |
| commit | cbb9d1e220351d5da2fbbdad27430e334c352e07 (patch) | |
| tree | 71a0e5fabab66a9154757be1d3a04fece9a592b6 | |
| parent | 72e83c3a33af3567191c665e876fb16be1483e4d (diff) | |
| download | qpid-python-cbb9d1e220351d5da2fbbdad27430e334c352e07.tar.gz | |
QPID-6192: [Java Broker] Add supporting test case guarding case when failover occurs when busy
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1635076 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java | 101 |
1 files changed, 101 insertions, 0 deletions
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java index 3331a8a665..a2242475eb 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java @@ -57,6 +57,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /** * Test suite to test all possible failover corner cases @@ -713,6 +714,106 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE); } + public void testKillBrokerFailoverWhilstPublishingInFlight() throws Exception + { + doFailoverWhilstPublishingInFlight(true); + } + + public void testStopBrokerFailoverWhilstPublishingInFlight() throws Exception + { + doFailoverWhilstPublishingInFlight(false); + } + + private void doFailoverWhilstPublishingInFlight(boolean hardKill) throws JMSException, InterruptedException + { + init(Session.SESSION_TRANSACTED, false); + + final int numberOfMessages = 200; + + final CountDownLatch halfWay = new CountDownLatch(1); + final CountDownLatch allDone = new CountDownLatch(1); + final AtomicReference<Exception> exception = new AtomicReference<>(); + + Runnable producerRunnable = new Runnable() + { + @Override + public void run() + { + Thread.currentThread().setName("ProducingThread"); + + try + { + for(int i=0; i< numberOfMessages; i++) + { + boolean success = false; + while(!success) + { + try + { + Message message = _producerSession.createMessage(); + message.setIntProperty("msgNum", i); + _producer.send(message); + _producerSession.commit(); + success = true; + } + catch (javax.jms.IllegalStateException e) + { + // fail - failover should not leave a JMS object in an illegal state + throw e; + } + catch (JMSException e) + { + // OK we will be failing over + _logger.debug("Got JMS exception, probably just failing over", e); + } + } + + if (i > numberOfMessages / 2 && halfWay.getCount() == 1) + { + halfWay.countDown(); + } + } + + allDone.countDown(); + } + catch (Exception e) + { + exception.set(e); + } + } + }; + + Thread producerThread = new Thread(producerRunnable); + producerThread.start(); + + assertTrue("Didn't get to half way within timeout", halfWay.await(30000, TimeUnit.MILLISECONDS)); + + if (hardKill) + { + _logger.debug("Killing the Broker"); + killBroker(getFailingPort()); + } + else + { + _logger.debug("Stopping the Broker"); + stopBroker(getFailingPort()); + } + + if (exception.get() != null) + { + _logger.error("Unexpected exception from producer thread", exception.get()); + } + assertNull("Producer thread should not have got an exception", exception.get()); + + assertTrue("All producing work was not completed", allDone.await(30000, TimeUnit.MILLISECONDS)); + + producerThread.join(30000); + + // Extra work to prove the session still okay + assertNotNull(_producerSession.createTemporaryQueue()); + } + + private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException { setDelayedFailoverPolicy(5); |
