summaryrefslogtreecommitdiff
path: root/qpid/java/systests
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-10-10 10:22:21 +0000
committerAidan Skinner <aidan@apache.org>2008-10-10 10:22:21 +0000
commit389e40e6f2778b0ab8baeb1361adba75b9a37bcf (patch)
tree3b7d5acbd1e80871548cb0ef1e3d9ae3f8363223 /qpid/java/systests
parente3f45422ddec5cae3d1802e315baacb42af37e2f (diff)
downloadqpid-python-389e40e6f2778b0ab8baeb1361adba75b9a37bcf.tar.gz
QPID-1289: Make 0-8/0-9 client honour the max_preftech system property.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@703383 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java3
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java57
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java4
3 files changed, 61 insertions, 3 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
index e14efe03a7..74d3c5f1cb 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
@@ -26,6 +26,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.client.*;
+import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
@@ -95,7 +96,7 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex
queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'"));
FieldTable ft = new FieldTable();
ft.setString("F1000", "1");
- consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft);
+ consumer = consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT) /2 , false, false, (String) null, ft);
//force synch to ensure the consumer has resulted in a bound queue
//((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
index c91c27e894..55750dcafb 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
@@ -21,13 +21,19 @@
package org.apache.qpid.test.unit.client;
import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.jms.TopicSession;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -187,6 +193,57 @@ public class AMQConnectionTest extends QpidTestCase
}
}
+ public void testPrefetchSystemProperty() throws Exception
+ {
+ String oldPrefetch = System.getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME);
+ try
+ {
+ _connection.close();
+ System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString());
+ _connection = (AMQConnection) getConnection();
+ _connection.start();
+ // Create two consumers on different sessions
+ Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumerA = consSessA.createConsumer(_queue);
+
+ Session producerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ // Send 3 messages
+ for (int i = 0; i < 3; i++)
+ {
+ producer.send(producerSession.createTextMessage(new Integer(i).toString()));
+ }
+ Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumerB = consSessB.createConsumer(_queue);
+
+ Message msg;
+ // Check that one consumer has 2 messages
+ for (int i = 0; i < 2; i++)
+ {
+ msg = consumerA.receive(1500);
+ assertNotNull(msg);
+ assertEquals(new Integer(i).toString(), ((TextMessage) msg).getText());
+ }
+
+ msg = consumerA.receive(1500);
+ assertNull(msg);
+
+ // Check that other consumer has last message
+ msg = consumerB.receive(1500);
+ assertNotNull(msg);
+ assertEquals(new Integer(2).toString(), ((TextMessage) msg).getText());
+ }
+ finally
+ {
+ if (oldPrefetch == null)
+ {
+ oldPrefetch = ClientProperties.MAX_PREFETCH_DEFAULT;
+ }
+ System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, oldPrefetch);
+ }
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(AMQConnectionTest.class);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
index 6fa0172ae3..7978e2c818 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
@@ -24,6 +24,7 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQHeadersExchange;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -76,8 +77,7 @@ public class StreamMessageTest extends QpidTestCase
FieldTable ft = new FieldTable();
ft.setString("F1000", "1");
MessageConsumer consumer =
- consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK,
- AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft);
+ consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), false, false, (String) null, ft);
// force synch to ensure the consumer has resulted in a bound queue
// ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);