diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-09-23 13:00:05 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-09-23 13:00:05 +0000 |
| commit | fbc287bd02a6bb91b709b798abfb859d8223fc43 (patch) | |
| tree | 0632659ef806e8eace91234ba3e0dd3323a26a83 /qpid/java/client | |
| parent | 61f7f00e14a5ef25f01bac4a14611f9598540c80 (diff) | |
| download | qpid-python-fbc287bd02a6bb91b709b798abfb859d8223fc43.tar.gz | |
QPID-3678 : [Java Client] Add support for setting link capacity to zero in ADDR addresses
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1626995 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
3 files changed, 83 insertions, 17 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 8f91a7db08..e0d8ac3702 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -389,7 +389,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { _syncReceive.set(true); } - if (_0_10session.isStarted() && _capacity == 0 && getSynchronousQueue().isEmpty()) + if (_0_10session.isStarted() && isMessageListenerSet() && _capacity == 0 && getSynchronousQueue().isEmpty()) { messageFlow(); } @@ -536,7 +536,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM private long evaluateCapacity(AMQDestination destination) { long capacity = 0; - if (destination.getLink() != null && destination.getLink().getConsumerCapacity() > 0) + if (destination.getLink() != null && destination.getLink().getConsumerCapacity() >= 0) { capacity = destination.getLink().getConsumerCapacity(); } @@ -547,4 +547,75 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return capacity; } + @Override + public Message receive(final long l) throws JMSException + { + long capacity = getCapacity(); + try + { + AMQSession_0_10 session = (AMQSession_0_10) getSession(); + + if (capacity == 0 && getMessageListener() == null) + { + session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); + + session.sync(); + + } + + Message message = super.receive(l); + + if (message == null && capacity == 0 && getMessageListener() == null) + { + session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 0, + Option.UNRELIABLE); + session.sync(); + + message = super.receiveNoWait(); + } + return message; + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } + + @Override + public Message receiveNoWait() throws JMSException + { + long capacity = getCapacity(); + try + { + AMQSession_0_10 session = (AMQSession_0_10) getSession(); + + if (capacity == 0 && getMessageListener() == null) + { + session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); + + session.sync(); + } + Message message = super.receiveNoWait(); + if (message == null && capacity == 0 && getMessageListener() == null) + { + session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 0, + Option.UNRELIABLE); + session.sync(); + + message = super.receiveNoWait(); + } + return message; + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 116fd11942..81ccaee1f3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -242,22 +242,17 @@ public class AddressHelper if (((Map) _address.getOptions().get(LINK)).get(CAPACITY) instanceof Map) { - MapAccessor capacityProps = new MapAccessor( - (Map) ((Map) _address.getOptions().get(LINK)) - .get(CAPACITY)); - link - .setConsumerCapacity(capacityProps - .getInt(CAPACITY_SOURCE) == null ? 0 - : capacityProps.getInt(CAPACITY_SOURCE)); - link - .setProducerCapacity(capacityProps - .getInt(CAPACITY_TARGET) == null ? 0 - : capacityProps.getInt(CAPACITY_TARGET)); + MapAccessor capacityProps = new MapAccessor((Map) ((Map) _address.getOptions().get(LINK)).get(CAPACITY)); + + Integer sourceCapacity = capacityProps.getInt(CAPACITY_SOURCE); + link.setConsumerCapacity(sourceCapacity == null ? -1 : sourceCapacity); + + Integer targetCapacity = capacityProps.getInt(CAPACITY_TARGET); + link.setProducerCapacity(targetCapacity == null ? -1 : targetCapacity); } else { - int cap = _linkPropAccess.getInt(CAPACITY) == null ? 0 : _linkPropAccess - .getInt(CAPACITY); + int cap = _linkPropAccess.getInt(CAPACITY) == null ? -1 : _linkPropAccess.getInt(CAPACITY); link.setConsumerCapacity(cap); link.setProducerCapacity(cap); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index 7e9cb3072a..b40abf3f98 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -38,8 +38,8 @@ public class Link private FilterType _filterType = FilterType.SUBJECT; private boolean _isNoLocal; private boolean _isDurable; - private int _consumerCapacity = 0; - private int _producerCapacity = 0; + private int _consumerCapacity = -1; + private int _producerCapacity = -1; private Subscription subscription; private Reliability reliability = Reliability.AT_LEAST_ONCE; private List<Binding> _bindings = Collections.emptyList(); |
