From 495a47e8feeb3cf3110ed9b972775fa40f58be00 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Tue, 18 May 2010 23:05:36 +0000 Subject: Implemented the feature described in QPID-2515 However a few issues needs to be ironed out - see the JIRA for these issues. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@945945 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQSession_0_10.java | 32 ++++++++-- .../qpid/client/BasicMessageConsumer_0_10.java | 40 +++++++++---- .../destination/AddressBasedDestinationTest.java | 69 ++++++++++++++++++++-- 3 files changed, 118 insertions(+), 23 deletions(-) diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 5344f77539..4aca7454bd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -142,7 +142,7 @@ public class AMQSession_0_10 extends AMQSession 0 && _dispatcher != null && (isStarted() || _immediatePrefetch)) { // set the flow getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, - getAMQConnection().getMaxPrefetch(), + capacity, Option.UNRELIABLE); } @@ -604,6 +607,21 @@ public class AMQSession_0_10 extends AMQSession 0) + { + capacity = destination.getSourceLink().getCapacity(); + } + else if (prefetch()) + { + capacity = getAMQConnection().getMaxPrefetch(); + } + return capacity; + } /** * Create an 0_10 message producer @@ -744,7 +762,9 @@ public class AMQSession_0_10 extends AMQSession 0) + { + capacity = destination.getSourceLink().getCapacity(); + } + else if (getSession().prefetch()) + { + capacity = _0_10session.getAMQConnection().getMaxPrefetch(); + } + } @@ -146,7 +161,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer 0) { _0_10session.getQpidSession().messageFlow - (getConsumerTagString(), MessageCreditUnit.MESSAGE, - _0_10session.getAMQConnection().getMaxPrefetch(), - Option.UNRELIABLE); + (getConsumerTagString(), + MessageCreditUnit.MESSAGE, + capacity, + Option.UNRELIABLE); } _0_10session.syncDispatchQueue(); o = super.getMessageFromQueue(-1); } - if (! getSession().prefetch()) + if (capacity == 0) { _syncReceive.set(false); } diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index fbc33a037b..7edaa780df 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -21,8 +21,12 @@ package org.apache.qpid.test.client.destination; */ +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import javax.jms.Connection; import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; @@ -37,8 +41,6 @@ import org.apache.qpid.test.utils.QpidTestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import edu.emory.mathcs.backport.java.util.Collections; - public class AddressBasedDestinationTest extends QpidTestCase { private static final Logger _logger = LoggerFactory.getLogger(AddressBasedDestinationTest.class); @@ -48,7 +50,7 @@ public class AddressBasedDestinationTest extends QpidTestCase public void setUp() throws Exception { super.setUp(); - _connection = getConnection(); + _connection = getConnection() ; _connection.start(); } @@ -211,6 +213,7 @@ public class AddressBasedDestinationTest extends QpidTestCase "}, " + "x-bindings: [{exchange : 'amq.direct', key : test}, " + "{exchange : 'amq.fanout'}," + + "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}," + "{exchange : 'amq.topic', key : 'a.#'}" + "]," + @@ -236,7 +239,15 @@ public class AddressBasedDestinationTest extends QpidTestCase assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", - dest.getAddressName(),"a.#", null)); + dest.getAddressName(),"a.#", null)); + + Map args = new HashMap(); + args.put("x-match","any"); + args.put("dep","sales"); + args.put("loc","CA"); + assertTrue("Queue not bound as expected",( + (AMQSession_0_10)jmsSession).isQueueBound("amq.match", + dest.getAddressName(),null, args)); } @@ -273,8 +284,7 @@ public class AddressBasedDestinationTest extends QpidTestCase // The existence of the queue is implicitly tested here assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("my-exchange", - dest.getQueueName(),"hello", Collections.emptyMap())); - + dest.getQueueName(),"hello", Collections.emptyMap())); } public void testBindQueueWithArgs() throws Exception @@ -331,6 +341,53 @@ public class AddressBasedDestinationTest extends QpidTestCase dest.getAddressName(),null, a.getOptions())); } + /** + * Test goal: Verifies the capacity property in address string is handled properly. + * Test strategy: + * Creates a destination with capacity 10. + * Creates consumer with client ack. + * Sends 15 messages to the queue, tries to receive 10. + * Tries to receive the 11th message and checks if its null. + * + * Since capacity is 10 and we haven't acked any messages, + * we should not have received the 11th. + * + * Acks the 10th message and verifies we receive the rest of the msgs. + */ + public void testLinkCapacity() throws Exception + { + if (!isCppBroker()) + { + _logger.info("Not C++ broker, exiting test"); + return; + } + + Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); + + String addr = "ADDR:my-queue; {create: always, link:{capacity: 10}}"; + AMQDestination dest = new AMQAnyDestination(addr); + MessageConsumer cons = jmsSession.createConsumer(dest); + MessageProducer prod = jmsSession.createProducer(dest); + + for (int i=0; i< 15; i++) + { + prod.send(jmsSession.createTextMessage("msg" + i) ); + } + + for (int i=0; i< 9; i++) + { + cons.receive(); + } + Message msg = cons.receive(RECEIVE_TIMEOUT); + assertNotNull("Should have received the 10th message",msg); + assertNull("Shouldn't have received the 11th message as capacity is 10",cons.receive(RECEIVE_TIMEOUT)); + msg.acknowledge(); + for (int i=11; i<16; i++) + { + assertNotNull("Should have received the " + i + "th message as we acked the last 10",cons.receive(RECEIVE_TIMEOUT)); + } + } + /*public void testBindQueueForXMLExchange() throws Exception { if (!isCppBroker()) -- cgit v1.2.1