diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-05-09 15:33:17 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-05-09 15:33:17 +0000 |
| commit | 6b4d8ff004e42b3f7c22b5632f317e81f1589ce3 (patch) | |
| tree | 26ce3d66664e8dd9ea2f8f3edd61601396c31cf0 /java/systests/src | |
| parent | 1bb43f379eda76afca9df488cd9510f60ad89717 (diff) | |
| download | qpid-python-6b4d8ff004e42b3f7c22b5632f317e81f1589ce3.tar.gz | |
Merged revisions 536506 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r536506 | ritchiem | 2007-05-09 13:32:27 +0100 (Wed, 09 May 2007) | 10 lines
QPID-25 TimeToLive Basic implementation.
Messages are not automatically purged rather they are checked as they are selected for delivery. If they have expired they are dequeued.
AMQChannel - Update to call setExpiration on the message so the time can be adjusted if client & broker clocks are out of sync.
AMQMessage - Caches the _expiration time for internal use, adjusted for broker time. This leaves message headers unchanged so receiving client can see actual value requested by producer.
ConcurrentSelectorDeliveryManager - Updated to check for expired messages when getNextMessage is called. Immediate messages are NOT expired.
Subscription - Added method to getChannel that this Subscription is attatched to so we can retrieve the StoreContext for dequeue-ing the message.
TimeToLiveTest - Test of Time to live. Sends 50 msgs. 1 non-timed 48 1 second and 1 non-timed ensure only 2 msgs come back after 2 seconds
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@536567 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
3 files changed, 152 insertions, 1 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java b/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java index 5ddccb8a7b..dc1f592679 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java @@ -62,7 +62,7 @@ public class MaxChannelsTest extends TestCase { for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++) { - _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null, null)); + _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null, null, null)); } } catch (AMQException e) diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index 1a0a341bbf..fe947ef3bc 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.server.AMQChannel; + import java.util.ArrayList; import java.util.List; import java.util.Queue; @@ -82,6 +84,10 @@ public class SubscriptionTestHelper implements Subscription return new Object(); } + public AMQChannel getChannel() + { + return null; + } public void queueDeleted(AMQQueue queue) { diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java new file mode 100644 index 0000000000..d8163759a6 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java @@ -0,0 +1,145 @@ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; +import junit.framework.Assert; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; +import org.apache.log4j.Logger; + +import javax.jms.JMSException; +import javax.jms.Session; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.ConnectionFactory; +import javax.jms.Connection; +import javax.jms.Message; +import javax.naming.spi.InitialContextFactory; +import javax.naming.Context; +import java.util.Hashtable; + + +/** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */ +public class TimeToLiveTest extends TestCase +{ + private static final Logger _logger = Logger.getLogger(TimeToLiveTest.class); + + + protected final String BROKER = "vm://:1"; + protected final String VHOST = "/test"; + protected final String QUEUE = "TimeToLiveQueue"; + + private final long TIME_TO_LIVE = 1000L; + + Context _context; + + private Connection _clientConnection, _producerConnection; + + private MessageConsumer _consumer; + MessageProducer _producer; + Session _clientSession, _producerSession; + private static final int MSG_COUNT = 50; + + protected void setUp() throws Exception + { + if (BROKER.startsWith("vm://")) + { + TransportConnection.createVMBroker(1); + } + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'"); + env.put("queue.queue", QUEUE); + + _context = factory.getInitialContext(env); + + Queue queue = (Queue) _context.lookup("queue"); + + //Create Client 1 + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _consumer = _clientSession.createConsumer(queue); + + //Create Producer + _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _producerConnection.start(); + + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _producer = _producerSession.createProducer(queue); + } + + protected void tearDown() throws Exception + { + _clientConnection.close(); + + _producerConnection.close(); + super.tearDown(); + + if (BROKER.startsWith("vm://")) + { + TransportConnection.killAllVMBrokers(); + } + } + + public void test() throws JMSException + { + //Set TTL + int msg = 0; + _producer.send(nextMessage(String.valueOf(msg), true)); + + _producer.setTimeToLive(TIME_TO_LIVE); + + for (; msg < MSG_COUNT - 2; msg++) + { + _producer.send(nextMessage(String.valueOf(msg), false)); + } + + //Reset TTL + _producer.setTimeToLive(0L); + _producer.send(nextMessage(String.valueOf(msg), false)); + + try + { + // Sleep to ensure TTL reached + Thread.sleep(2000); + } + catch (InterruptedException e) + { + + } + + _clientConnection.start(); + + //Receive Message 0 + Message received = _consumer.receive(100); + Assert.assertNotNull("First message not received", received); + Assert.assertTrue("First message doesn't have first set.", received.getBooleanProperty("first")); + Assert.assertEquals("First message has incorrect TTL.", 0L, received.getLongProperty("TTL")); + + + received = _consumer.receive(100); + Assert.assertNotNull("Final message not received", received); + Assert.assertFalse("Final message has first set.", received.getBooleanProperty("first")); + Assert.assertEquals("Final message has incorrect TTL.", 0L, received.getLongProperty("TTL")); + + received = _consumer.receive(100); + Assert.assertNull("More messages received", received); + } + + private Message nextMessage(String msg, boolean first) throws JMSException + { + Message send = _producerSession.createTextMessage("Message " + msg); + send.setBooleanProperty("first", first); + send.setLongProperty("TTL", _producer.getTimeToLive()); + return send; + } + + +} |
