diff options
| author | Robert Gemmell <robbie@apache.org> | 2010-02-24 16:56:30 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2010-02-24 16:56:30 +0000 |
| commit | 7c8794a89ad4dbf04017918bdb5331623d0ce957 (patch) | |
| tree | 9c7a17ef790460de844653aaa3d4190e5e7fc5d8 /java | |
| parent | 537ea5882ed0d6afd0202a3c793d04327470fc27 (diff) | |
| download | qpid-python-7c8794a89ad4dbf04017918bdb5331623d0ce957.tar.gz | |
QPID-2417: add TTL testing of durable topic subscription
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@915867 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
| -rw-r--r-- | java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java | 155 |
1 files changed, 155 insertions, 0 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java index c73959676d..e14d4011a3 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java @@ -28,12 +28,16 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; +import javax.jms.TopicSubscriber; import junit.framework.Assert; import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.AMQQueue; import org.apache.qpid.test.utils.QpidTestCase; import java.util.concurrent.locks.ReentrantLock; @@ -154,6 +158,7 @@ public class TimeToLiveTest extends QpidTestCase { Message send = producerSession.createTextMessage("Message " + msg); send.setBooleanProperty("first", first); + send.setStringProperty("testprop", "TimeToLiveTest"); send.setLongProperty("TTL", producer.getTimeToLive()); return send; } @@ -206,5 +211,155 @@ public class TimeToLiveTest extends QpidTestCase producerSession.close(); producerConnection.close(); } + + public void testPassiveTTLwithDurableSubscription() throws Exception + { + //Create Client 1 + Connection clientConnection = getConnection(); + + Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create and close the durable subscriber + AMQTopic topic = new AMQTopic((AMQConnection) clientConnection, getTestQueueName()); + TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false); + durableSubscriber.close(); + + //Create Producer + Connection producerConnection = getConnection(); + + producerConnection.start(); + + // Move to a Transacted session to ensure that all messages have been delivered to broker before + // we start waiting for TTL + Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED); + + MessageProducer producer = producerSession.createProducer(topic); + + //Set TTL + int msg = 0; + producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer)); + + producer.setTimeToLive(TIME_TO_LIVE); + + for (; msg < MSG_COUNT - 2; msg++) + { + producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer)); + } + + //Reset TTL + producer.setTimeToLive(0L); + producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer)); + + producerSession.commit(); + + //resubscribe + durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName()); + + // Ensure we sleep the required amount of time. + ReentrantLock waitLock = new ReentrantLock(); + Condition wait = waitLock.newCondition(); + final long MILLIS = 1000000L; + + long waitTime = TIME_TO_LIVE * MILLIS; + while (waitTime > 0) + { + try + { + waitLock.lock(); + + waitTime = wait.awaitNanos(waitTime); + } + catch (InterruptedException e) + { + //Stop if we are interrupted + fail(e.getMessage()); + } + finally + { + waitLock.unlock(); + } + + } + + clientConnection.start(); + + //Receive Message 0 + // Set 5s receive time for messages we expect to receive. + Message receivedFirst = durableSubscriber.receive(5000); + Message receivedSecond = durableSubscriber.receive(5000); + Message receivedThird = durableSubscriber.receive(1000); + + // Log the messages to help diagnosis incase of failure + _logger.info("First:"+receivedFirst); + _logger.info("Second:"+receivedSecond); + _logger.info("Third:"+receivedThird); + + // Only first and last messages sent should survive expiry + Assert.assertNull("More messages received", receivedThird); + + Assert.assertNotNull("First message not received", receivedFirst); + Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first")); + Assert.assertEquals("First message has incorrect TTL.", 0L, receivedFirst.getLongProperty("TTL")); + + Assert.assertNotNull("Final message not received", receivedSecond); + Assert.assertFalse("Final message has first set.", receivedSecond.getBooleanProperty("first")); + Assert.assertEquals("Final message has incorrect TTL.", 0L, receivedSecond.getLongProperty("TTL")); + + clientConnection.close(); + + producerConnection.close(); + } + + public void testActiveTTLwithDurableSubscription() throws Exception + { + //Create Client 1 + Connection clientConnection = getConnection(); + Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create and close the durable subscriber + AMQTopic topic = new AMQTopic((AMQConnection) clientConnection, getTestQueueName()); + TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, "MyDurableTTLSubscription","testprop='TimeToLiveTest'", false); + durableSubscriber.close(); + + //Create Producer + Connection producerConnection = getConnection(); + AMQSession producerSession = (AMQSession) producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(topic); + producer.setTimeToLive(1000L); + + // send Messages + for(int i = 0; i < MSG_COUNT; i++) + { + producer.send(producerSession.createTextMessage("Message: "+i)); + } + long failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT; + + // check Queue depth for up to TIMEOUT seconds after the Queue Depth hasn't changed for 100ms. + long messageCount = MSG_COUNT; + long lastPass; + AMQQueue subcriptionQueue = new AMQQueue("amq.topic","clientid" + ":" + "MyDurableTTLSubscription"); + do + { + lastPass = messageCount; + Thread.sleep(100); + messageCount = producerSession.getQueueDepth((AMQDestination) subcriptionQueue); + + // If we have received messages in the last loop then extend the timeout time. + // if we get messages stuck that are not expiring then the failureTime will occur + // failing the test. This will help with the scenario when the broker does not + // have enough CPU cycles to process the TTLs. + if (lastPass != messageCount) + { + failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT; + } + } + while(messageCount > 0L && System.currentTimeMillis() < failureTime); + + assertEquals("Messages not automatically expired: ", 0L, messageCount); + + producer.close(); + producerSession.close(); + producerConnection.close(); + } } |
