diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2010-05-18 23:05:36 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2010-05-18 23:05:36 +0000 |
| commit | 495a47e8feeb3cf3110ed9b972775fa40f58be00 (patch) | |
| tree | 2bdb1ade7d0cf67f8fde7f211f7bccc9348fb044 /java/client/src | |
| parent | 27920d16e70f590b49548877a2129a1d2162d985 (diff) | |
| download | qpid-python-495a47e8feeb3cf3110ed9b972775fa40f58be00.tar.gz | |
Implemented the feature described in QPID-2515
However a few issues needs to be ironed out - see the JIRA for these issues.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@945945 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 32 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 40 |
2 files changed, 55 insertions, 17 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 5344f77539..4aca7454bd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -142,7 +142,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * USed to store the range of in tx messages */ private RangeSet _txRangeSet = new RangeSet(); - private int _txSize = 0; + private int _txSize = 0; //--- constructors /** @@ -560,6 +560,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic throws AMQException, FailoverException { boolean preAcquire; + + long capacity = getCapacity(consumer.getDestination()); + try { preAcquire = ( ! consumer.isNoConsume() && @@ -578,7 +581,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString(); - if (! prefetch()) + if (capacity == 0) { getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT); } @@ -589,12 +592,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, Option.UNRELIABLE); - if(prefetch() && _dispatcher != null && (isStarted() || _immediatePrefetch)) + if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch)) { // set the flow getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, - getAMQConnection().getMaxPrefetch(), + capacity, Option.UNRELIABLE); } @@ -604,6 +607,21 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic getCurrentException(); } } + + private long getCapacity(AMQDestination destination) + { + long capacity = 0; + if (destination.getDestSyntax() == DestSyntax.ADDR && + destination.getSourceLink().getCapacity() > 0) + { + capacity = destination.getSourceLink().getCapacity(); + } + else if (prefetch()) + { + capacity = getAMQConnection().getMaxPrefetch(); + } + return capacity; + } /** * Create an 0_10 message producer @@ -744,7 +762,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic //only set if msg list is null try { - if (! prefetch()) + long capacity = getCapacity(consumer.getDestination()); + + if (capacity == 0) { if (consumer.getMessageListener() != null) { @@ -757,7 +777,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { getQpidSession() .messageFlow(consumerTag, MessageCreditUnit.MESSAGE, - getAMQConnection().getMaxPrefetch(), + capacity, Option.UNRELIABLE); } getQpidSession() diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 1b2d6876bd..321f5855d7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -19,6 +19,7 @@ package org.apache.qpid.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.FieldTable; @@ -72,7 +73,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM */ private final AtomicBoolean _syncReceive = new AtomicBoolean(false); private String _consumerTagString; - + + private long capacity = 0; + //--- constructor protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, @@ -100,6 +103,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } _isStarted = connection.started(); + + // Destination setting overrides connection defaults + if (destination.getDestSyntax() == DestSyntax.ADDR && + destination.getSourceLink().getCapacity() > 0) + { + capacity = destination.getSourceLink().getCapacity(); + } + else if (getSession().prefetch()) + { + capacity = _0_10session.getAMQConnection().getMaxPrefetch(); + } + } @@ -146,7 +161,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } if (messageOk) { - if (isMessageListenerSet() && ! getSession().prefetch()) + if (isMessageListenerSet() && capacity == 0) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1, @@ -245,7 +260,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } // if we are syncrhonously waiting for a message // and messages are not prefetched we then need to request another one - if(! getSession().prefetch()) + if(capacity == 0) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1, @@ -333,7 +348,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM public void setMessageListener(final MessageListener messageListener) throws JMSException { super.setMessageListener(messageListener); - if (messageListener != null && ! getSession().prefetch()) + if (messageListener != null && capacity == 0) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1, @@ -372,11 +387,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM */ public Object getMessageFromQueue(long l) throws InterruptedException { - if (! getSession().prefetch()) + if (capacity == 0) { _syncReceive.set(true); } - if (_0_10session.isStarted() && ! getSession().prefetch() && _synchronousQueue.isEmpty()) + if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty()) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1, @@ -385,23 +400,26 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM Object o = super.getMessageFromQueue(l); if (o == null && _0_10session.isStarted()) { + _0_10session.getQpidSession().messageFlush (getConsumerTagString(), Option.UNRELIABLE, Option.SYNC); _0_10session.getQpidSession().sync(); _0_10session.getQpidSession().messageFlow (getConsumerTagString(), MessageCreditUnit.BYTE, 0xFFFFFFFF, Option.UNRELIABLE); - if (getSession().prefetch()) + + if (capacity > 0) { _0_10session.getQpidSession().messageFlow - (getConsumerTagString(), MessageCreditUnit.MESSAGE, - _0_10session.getAMQConnection().getMaxPrefetch(), - Option.UNRELIABLE); + (getConsumerTagString(), + MessageCreditUnit.MESSAGE, + capacity, + Option.UNRELIABLE); } _0_10session.syncDispatchQueue(); o = super.getMessageFromQueue(-1); } - if (! getSession().prefetch()) + if (capacity == 0) { _syncReceive.set(false); } |
