summaryrefslogtreecommitdiff
path: root/java/systests/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-05-09 15:33:17 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-05-09 15:33:17 +0000
commit6b4d8ff004e42b3f7c22b5632f317e81f1589ce3 (patch)
tree26ce3d66664e8dd9ea2f8f3edd61601396c31cf0 /java/systests/src
parent1bb43f379eda76afca9df488cd9510f60ad89717 (diff)
downloadqpid-python-6b4d8ff004e42b3f7c22b5632f317e81f1589ce3.tar.gz
Merged revisions 536506 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r536506 | ritchiem | 2007-05-09 13:32:27 +0100 (Wed, 09 May 2007) | 10 lines QPID-25 TimeToLive Basic implementation. Messages are not automatically purged rather they are checked as they are selected for delivery. If they have expired they are dequeued. AMQChannel - Update to call setExpiration on the message so the time can be adjusted if client & broker clocks are out of sync. AMQMessage - Caches the _expiration time for internal use, adjusted for broker time. This leaves message headers unchanged so receiving client can see actual value requested by producer. ConcurrentSelectorDeliveryManager - Updated to check for expired messages when getNextMessage is called. Immediate messages are NOT expired. Subscription - Added method to getChannel that this Subscription is attatched to so we can retrieve the StoreContext for dequeue-ing the message. TimeToLiveTest - Test of Time to live. Sends 50 msgs. 1 non-timed 48 1 second and 1 non-timed ensure only 2 msgs come back after 2 seconds ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@536567 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java6
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java145
3 files changed, 152 insertions, 1 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java b/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
index 5ddccb8a7b..dc1f592679 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
@@ -62,7 +62,7 @@ public class MaxChannelsTest extends TestCase
{
for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++)
{
- _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null, null));
+ _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null, null, null));
}
}
catch (AMQException e)
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index 1a0a341bbf..fe947ef3bc 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.AMQChannel;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
@@ -82,6 +84,10 @@ public class SubscriptionTestHelper implements Subscription
return new Object();
}
+ public AMQChannel getChannel()
+ {
+ return null;
+ }
public void queueDeleted(AMQQueue queue)
{
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
new file mode 100644
index 0000000000..d8163759a6
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
@@ -0,0 +1,145 @@
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import junit.framework.Assert;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.log4j.Logger;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.ConnectionFactory;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.naming.spi.InitialContextFactory;
+import javax.naming.Context;
+import java.util.Hashtable;
+
+
+/** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */
+public class TimeToLiveTest extends TestCase
+{
+ private static final Logger _logger = Logger.getLogger(TimeToLiveTest.class);
+
+
+ protected final String BROKER = "vm://:1";
+ protected final String VHOST = "/test";
+ protected final String QUEUE = "TimeToLiveQueue";
+
+ private final long TIME_TO_LIVE = 1000L;
+
+ Context _context;
+
+ private Connection _clientConnection, _producerConnection;
+
+ private MessageConsumer _consumer;
+ MessageProducer _producer;
+ Session _clientSession, _producerSession;
+ private static final int MSG_COUNT = 50;
+
+ protected void setUp() throws Exception
+ {
+ if (BROKER.startsWith("vm://"))
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+ Hashtable<String, String> env = new Hashtable<String, String>();
+
+ env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'");
+ env.put("queue.queue", QUEUE);
+
+ _context = factory.getInitialContext(env);
+
+ Queue queue = (Queue) _context.lookup("queue");
+
+ //Create Client 1
+ _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _consumer = _clientSession.createConsumer(queue);
+
+ //Create Producer
+ _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _producerConnection.start();
+
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _producer = _producerSession.createProducer(queue);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _clientConnection.close();
+
+ _producerConnection.close();
+ super.tearDown();
+
+ if (BROKER.startsWith("vm://"))
+ {
+ TransportConnection.killAllVMBrokers();
+ }
+ }
+
+ public void test() throws JMSException
+ {
+ //Set TTL
+ int msg = 0;
+ _producer.send(nextMessage(String.valueOf(msg), true));
+
+ _producer.setTimeToLive(TIME_TO_LIVE);
+
+ for (; msg < MSG_COUNT - 2; msg++)
+ {
+ _producer.send(nextMessage(String.valueOf(msg), false));
+ }
+
+ //Reset TTL
+ _producer.setTimeToLive(0L);
+ _producer.send(nextMessage(String.valueOf(msg), false));
+
+ try
+ {
+ // Sleep to ensure TTL reached
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+
+ _clientConnection.start();
+
+ //Receive Message 0
+ Message received = _consumer.receive(100);
+ Assert.assertNotNull("First message not received", received);
+ Assert.assertTrue("First message doesn't have first set.", received.getBooleanProperty("first"));
+ Assert.assertEquals("First message has incorrect TTL.", 0L, received.getLongProperty("TTL"));
+
+
+ received = _consumer.receive(100);
+ Assert.assertNotNull("Final message not received", received);
+ Assert.assertFalse("Final message has first set.", received.getBooleanProperty("first"));
+ Assert.assertEquals("Final message has incorrect TTL.", 0L, received.getLongProperty("TTL"));
+
+ received = _consumer.receive(100);
+ Assert.assertNull("More messages received", received);
+ }
+
+ private Message nextMessage(String msg, boolean first) throws JMSException
+ {
+ Message send = _producerSession.createTextMessage("Message " + msg);
+ send.setBooleanProperty("first", first);
+ send.setLongProperty("TTL", _producer.getTimeToLive());
+ return send;
+ }
+
+
+}