diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2008-07-31 13:38:00 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2008-07-31 13:38:00 +0000 |
| commit | 272b42db45a47a952a8343a08edda44a6984037a (patch) | |
| tree | cf41ccd14d5e539c0f5f25d01b8437bca31153d9 | |
| parent | 9fb5fd7a0a800591c334bde2b9556e984217d7de (diff) | |
| download | qpid-python-272b42db45a47a952a8343a08edda44a6984037a.tar.gz | |
qpid-1163: Added test for qpid-1163 (Note: I have checked that this test did not pass before r673074)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@681367 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java | 80 |
1 files changed, 73 insertions, 7 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index b2797e2535..9c755fcb41 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -25,13 +25,9 @@ import org.apache.qpid.client.AMQConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; +import javax.jms.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * This class tests a number of commits and roll back scenarios @@ -503,4 +499,74 @@ public class CommitRollbackTest extends QpidTestCase _session.commit(); } + /** + * Qpid-1163 + * Check that when commt is called inside onMessage then + * the last message is nor redelivered. + * + * @throws Exception + */ + public void testCommitWhithinOnMessage() throws Exception + { + Queue queue = (Queue) getInitialContext().lookup("queue"); + // create a consumer + MessageConsumer cons = _session.createConsumer(queue); + MessageProducer prod = _session.createProducer(queue); + Message message = _session.createTextMessage("Message"); + message.setJMSCorrelationID("m1"); + prod.send(message); + _session.commit(); + _logger.info("Sent message to queue"); + CountDownLatch cd = new CountDownLatch(1); + cons.setMessageListener(new CommitWhithinOnMessageListener(cd)); + conn.start(); + cd.await(30, TimeUnit.SECONDS); + if( cd.getCount() > 0 ) + { + fail("Did not received message"); + } + // Check that the message has been dequeued + _session.close(); + conn.close(); + conn = (AMQConnection) getConnection("guest", "guest"); + conn.start(); + Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + cons = session.createConsumer(queue); + message = cons.receiveNoWait(); + if(message != null) + { + if(message.getJMSCorrelationID().equals("m1")) + { + fail("received message twice"); + } + else + { + fail("queue should have been empty, received message: " + message); + } + } + } + + private class CommitWhithinOnMessageListener implements MessageListener + { + private CountDownLatch _cd; + private CommitWhithinOnMessageListener(CountDownLatch cd) + { + _cd = cd; + } + public void onMessage(Message message) + { + try + { + _logger.info("received message " + message); + assertEquals("Wrong message received", message.getJMSCorrelationID(), "m1"); + _logger.info("commit session"); + _session.commit(); + _cd.countDown(); + } + catch (JMSException e) + { + e.printStackTrace(); + } + } + } } |
