diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2010-04-29 18:06:35 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2010-04-29 18:06:35 +0000 |
| commit | 9bb15794d3d7f6888b2e4742e8216cd687ec830a (patch) | |
| tree | 31debb024272d9e2908be00d8a6bfdda33a638a9 /java/systests/src | |
| parent | fa66203eb87d8d4b0866f1af9488c8802078b47e (diff) | |
| download | qpid-python-9bb15794d3d7f6888b2e4742e8216cd687ec830a.tar.gz | |
QPID-2471
Added two test cases to verify ordering while using recover with sync and async consumer.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@939410 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
| -rw-r--r-- | java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java | 141 |
1 files changed, 140 insertions, 1 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 4a123cb1dc..50cb1ae9d7 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -27,6 +27,9 @@ import org.apache.qpid.test.utils.FailoverBaseCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -34,6 +37,9 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.TextMessage; + +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; public class RecoverTest extends FailoverBaseCase @@ -90,6 +96,7 @@ public class RecoverTest extends FailoverBaseCase while (index != SENT_COUNT) { message = _consumer.receive(3000); + assertNotNull(message); assertEquals(index++, message.getIntProperty(INDEX)); } @@ -142,9 +149,11 @@ public class RecoverTest extends FailoverBaseCase _consumerSession.recover(); Message message2 = _consumer.receive(3000); + assertNotNull(message2); assertEquals(2, message2.getIntProperty(INDEX)); Message message3 = _consumer.receive(3000); + assertNotNull(message3); assertEquals(3, message3.getIntProperty(INDEX)); _logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message"); @@ -155,6 +164,7 @@ public class RecoverTest extends FailoverBaseCase _consumerSession.recover(); message3 = _consumer.receive(3000); + assertNotNull(message3); assertEquals(3, message3.getIntProperty(INDEX)); ((org.apache.qpid.jms.Message) message3).acknowledgeThis(); @@ -194,7 +204,6 @@ public class RecoverTest extends FailoverBaseCase assertEquals("msg2", tm2.getText()); tm2.acknowledge(); - consumerSession.recover(); TextMessage tm1 = (TextMessage) consumer.receive(2000); @@ -303,4 +312,134 @@ public class RecoverTest extends FailoverBaseCase { _error = e; } + + private void sendMessages(javax.jms.Session session,Destination dest,int count) throws Exception + { + MessageProducer prod = session.createProducer(dest); + for (int i=0; i<count; i++) + { + prod.send(session.createTextMessage("Msg" + i)); + } + prod.close(); + } + + /** + * Test strategy + * Send 8 messages to a topic. + * The consumer will call recover until it sees a message 5 times, + * at which point it will ack that message. + * It will continue the above until it acks all the messages. + * While doing so it will verify that the messages are not + * delivered out of order. + */ + public void testOderingWithSyncConsumer() throws Exception + { + Connection con = (Connection) getConnection("guest", "guest"); + javax.jms.Session session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Destination topic = session.createTopic("myTopic"); + MessageConsumer cons = session.createConsumer(topic); + + sendMessages(session,topic,8); + con.start(); + + int messageSeen = 0; + int expectedMsg = 0; + + while(expectedMsg < 8) + { + Message message = cons.receive(); + String text=((TextMessage) message).getText(); + + assertEquals("Received Message Out Of Order","Msg"+expectedMsg,text); + + //don't ack the message until we receive it 5 times + if( messageSeen < 5 ) + { + _logger.debug("Ignoring message " + text + " and calling recover"); + session.recover(); + messageSeen++; + } + else + { + messageSeen = 0; + expectedMsg++; + message.acknowledge(); + _logger.debug("Acknowledging message " + text); + } + } + } + + /** + * Test strategy + * Same as testOderingWithSyncConsumer but using a + * Message Listener instead of a sync receive(). + */ + public void testOderingWithAsyncConsumer() throws Exception + { + Connection con = (Connection) getConnection("guest", "guest"); + final javax.jms.Session session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Destination topic = session.createTopic("myTopic"); + MessageConsumer cons = session.createConsumer(topic); + + sendMessages(session,topic,8); + con.start(); + + final Object lock = new Object(); + final AtomicBoolean pass = new AtomicBoolean(false); //used as work around for 'final' + cons.setMessageListener(new MessageListener() + { + int messageSeen = 0; + int expectedMsg = 0; + + public void onMessage(Message message) + { + try + { + String text = ((TextMessage) message).getText(); + assertEquals("Received Message Out Of Order","Msg"+expectedMsg,text); + + //don't ack the message until we receive it 5 times + if( messageSeen < 5 ) + { + _logger.debug("Ignoring message " + text + " and calling recover"); + session.recover(); + messageSeen++; + } + else + { + messageSeen = 0; + expectedMsg++; + message.acknowledge(); + _logger.debug("Acknowledging message " + text); + if (expectedMsg == 8) + { + pass.set(true); + synchronized (lock) + { + lock.notifyAll(); + } + } + } + } + catch (JMSException e) + { + fail("Exception : " + e.getMessage()); + synchronized (lock) + { + lock.notifyAll(); + } + } + } + }); + + synchronized(lock) + { + lock.wait(5000); + } + + if (!pass.get()) + { + fail("Test did not complete on time. Please check the logs"); + } + } } |
