summaryrefslogtreecommitdiff
path: root/java/systests
diff options
context:
space:
mode:
Diffstat (limited to 'java/systests')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java3
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java57
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java4
3 files changed, 61 insertions, 3 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
index e14efe03a7..74d3c5f1cb 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
@@ -26,6 +26,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.client.*;
+import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
@@ -95,7 +96,7 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex
queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'"));
FieldTable ft = new FieldTable();
ft.setString("F1000", "1");
- consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft);
+ consumer = consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT) /2 , false, false, (String) null, ft);
//force synch to ensure the consumer has resulted in a bound queue
//((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
index c91c27e894..55750dcafb 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
@@ -21,13 +21,19 @@
package org.apache.qpid.test.unit.client;
import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.jms.TopicSession;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -187,6 +193,57 @@ public class AMQConnectionTest extends QpidTestCase
}
}
+ public void testPrefetchSystemProperty() throws Exception
+ {
+ String oldPrefetch = System.getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME);
+ try
+ {
+ _connection.close();
+ System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString());
+ _connection = (AMQConnection) getConnection();
+ _connection.start();
+ // Create two consumers on different sessions
+ Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumerA = consSessA.createConsumer(_queue);
+
+ Session producerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ // Send 3 messages
+ for (int i = 0; i < 3; i++)
+ {
+ producer.send(producerSession.createTextMessage(new Integer(i).toString()));
+ }
+ Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumerB = consSessB.createConsumer(_queue);
+
+ Message msg;
+ // Check that one consumer has 2 messages
+ for (int i = 0; i < 2; i++)
+ {
+ msg = consumerA.receive(1500);
+ assertNotNull(msg);
+ assertEquals(new Integer(i).toString(), ((TextMessage) msg).getText());
+ }
+
+ msg = consumerA.receive(1500);
+ assertNull(msg);
+
+ // Check that other consumer has last message
+ msg = consumerB.receive(1500);
+ assertNotNull(msg);
+ assertEquals(new Integer(2).toString(), ((TextMessage) msg).getText());
+ }
+ finally
+ {
+ if (oldPrefetch == null)
+ {
+ oldPrefetch = ClientProperties.MAX_PREFETCH_DEFAULT;
+ }
+ System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, oldPrefetch);
+ }
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(AMQConnectionTest.class);
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
index 6fa0172ae3..7978e2c818 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
@@ -24,6 +24,7 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQHeadersExchange;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -76,8 +77,7 @@ public class StreamMessageTest extends QpidTestCase
FieldTable ft = new FieldTable();
ft.setString("F1000", "1");
MessageConsumer consumer =
- consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK,
- AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft);
+ consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), false, false, (String) null, ft);
// force synch to ensure the consumer has resulted in a bound queue
// ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);