diff options
| author | Aidan Skinner <aidan@apache.org> | 2008-10-10 10:22:21 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2008-10-10 10:22:21 +0000 |
| commit | 9e422621ddc3bf6fb8af271663deb3ac62ff72b0 (patch) | |
| tree | f3ee97de900faa0c1c84de25b23022662c2e4d42 /java/systests | |
| parent | 75c3d77f879c4e3175b249250f5cfbbff2480fbe (diff) | |
| download | qpid-python-9e422621ddc3bf6fb8af271663deb3ac62ff72b0.tar.gz | |
QPID-1289: Make 0-8/0-9 client honour the max_preftech system property.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703383 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests')
3 files changed, 61 insertions, 3 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java index e14efe03a7..74d3c5f1cb 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java @@ -26,6 +26,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.util.NullApplicationRegistry; import org.apache.qpid.client.*; +import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; @@ -95,7 +96,7 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'")); FieldTable ft = new FieldTable(); ft.setString("F1000", "1"); - consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft); + consumer = consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT) /2 , false, false, (String) null, ft); //force synch to ensure the consumer has resulted in a bound queue //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java index c91c27e894..55750dcafb 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java @@ -21,13 +21,19 @@ package org.apache.qpid.test.unit.client; import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.TextMessage; import javax.jms.TopicSession; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.test.utils.QpidTestCase; @@ -187,6 +193,57 @@ public class AMQConnectionTest extends QpidTestCase } } + public void testPrefetchSystemProperty() throws Exception + { + String oldPrefetch = System.getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME); + try + { + _connection.close(); + System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString()); + _connection = (AMQConnection) getConnection(); + _connection.start(); + // Create two consumers on different sessions + Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumerA = consSessA.createConsumer(_queue); + + Session producerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(_queue); + + // Send 3 messages + for (int i = 0; i < 3; i++) + { + producer.send(producerSession.createTextMessage(new Integer(i).toString())); + } + Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumerB = consSessB.createConsumer(_queue); + + Message msg; + // Check that one consumer has 2 messages + for (int i = 0; i < 2; i++) + { + msg = consumerA.receive(1500); + assertNotNull(msg); + assertEquals(new Integer(i).toString(), ((TextMessage) msg).getText()); + } + + msg = consumerA.receive(1500); + assertNull(msg); + + // Check that other consumer has last message + msg = consumerB.receive(1500); + assertNotNull(msg); + assertEquals(new Integer(2).toString(), ((TextMessage) msg).getText()); + } + finally + { + if (oldPrefetch == null) + { + oldPrefetch = ClientProperties.MAX_PREFETCH_DEFAULT; + } + System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, oldPrefetch); + } + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(AMQConnectionTest.class); diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index 6fa0172ae3..7978e2c818 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -24,6 +24,7 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQHeadersExchange; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -76,8 +77,7 @@ public class StreamMessageTest extends QpidTestCase FieldTable ft = new FieldTable(); ft.setString("F1000", "1"); MessageConsumer consumer = - consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, - AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft); + consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), false, false, (String) null, ft); // force synch to ensure the consumer has resulted in a bound queue // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); |
