summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-09-23 13:00:05 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-09-23 13:00:05 +0000
commitfbc287bd02a6bb91b709b798abfb859d8223fc43 (patch)
tree0632659ef806e8eace91234ba3e0dd3323a26a83 /qpid/java/client
parent61f7f00e14a5ef25f01bac4a14611f9598540c80 (diff)
downloadqpid-python-fbc287bd02a6bb91b709b798abfb859d8223fc43.tar.gz
QPID-3678 : [Java Client] Add support for setting link capacity to zero in ADDR addresses
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1626995 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java75
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java21
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java4
3 files changed, 83 insertions, 17 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 8f91a7db08..e0d8ac3702 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -389,7 +389,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
_syncReceive.set(true);
}
- if (_0_10session.isStarted() && _capacity == 0 && getSynchronousQueue().isEmpty())
+ if (_0_10session.isStarted() && isMessageListenerSet() && _capacity == 0 && getSynchronousQueue().isEmpty())
{
messageFlow();
}
@@ -536,7 +536,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
private long evaluateCapacity(AMQDestination destination)
{
long capacity = 0;
- if (destination.getLink() != null && destination.getLink().getConsumerCapacity() > 0)
+ if (destination.getLink() != null && destination.getLink().getConsumerCapacity() >= 0)
{
capacity = destination.getLink().getConsumerCapacity();
}
@@ -547,4 +547,75 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return capacity;
}
+ @Override
+ public Message receive(final long l) throws JMSException
+ {
+ long capacity = getCapacity();
+ try
+ {
+ AMQSession_0_10 session = (AMQSession_0_10) getSession();
+
+ if (capacity == 0 && getMessageListener() == null)
+ {
+ session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
+
+ session.sync();
+
+ }
+
+ Message message = super.receive(l);
+
+ if (message == null && capacity == 0 && getMessageListener() == null)
+ {
+ session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 0,
+ Option.UNRELIABLE);
+ session.sync();
+
+ message = super.receiveNoWait();
+ }
+ return message;
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
+
+ @Override
+ public Message receiveNoWait() throws JMSException
+ {
+ long capacity = getCapacity();
+ try
+ {
+ AMQSession_0_10 session = (AMQSession_0_10) getSession();
+
+ if (capacity == 0 && getMessageListener() == null)
+ {
+ session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
+
+ session.sync();
+ }
+ Message message = super.receiveNoWait();
+ if (message == null && capacity == 0 && getMessageListener() == null)
+ {
+ session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 0,
+ Option.UNRELIABLE);
+ session.sync();
+
+ message = super.receiveNoWait();
+ }
+ return message;
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
index 116fd11942..81ccaee1f3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
@@ -242,22 +242,17 @@ public class AddressHelper
if (((Map) _address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
{
- MapAccessor capacityProps = new MapAccessor(
- (Map) ((Map) _address.getOptions().get(LINK))
- .get(CAPACITY));
- link
- .setConsumerCapacity(capacityProps
- .getInt(CAPACITY_SOURCE) == null ? 0
- : capacityProps.getInt(CAPACITY_SOURCE));
- link
- .setProducerCapacity(capacityProps
- .getInt(CAPACITY_TARGET) == null ? 0
- : capacityProps.getInt(CAPACITY_TARGET));
+ MapAccessor capacityProps = new MapAccessor((Map) ((Map) _address.getOptions().get(LINK)).get(CAPACITY));
+
+ Integer sourceCapacity = capacityProps.getInt(CAPACITY_SOURCE);
+ link.setConsumerCapacity(sourceCapacity == null ? -1 : sourceCapacity);
+
+ Integer targetCapacity = capacityProps.getInt(CAPACITY_TARGET);
+ link.setProducerCapacity(targetCapacity == null ? -1 : targetCapacity);
}
else
{
- int cap = _linkPropAccess.getInt(CAPACITY) == null ? 0 : _linkPropAccess
- .getInt(CAPACITY);
+ int cap = _linkPropAccess.getInt(CAPACITY) == null ? -1 : _linkPropAccess.getInt(CAPACITY);
link.setConsumerCapacity(cap);
link.setProducerCapacity(cap);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
index 7e9cb3072a..b40abf3f98 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
@@ -38,8 +38,8 @@ public class Link
private FilterType _filterType = FilterType.SUBJECT;
private boolean _isNoLocal;
private boolean _isDurable;
- private int _consumerCapacity = 0;
- private int _producerCapacity = 0;
+ private int _consumerCapacity = -1;
+ private int _producerCapacity = -1;
private Subscription subscription;
private Reliability reliability = Reliability.AT_LEAST_ONCE;
private List<Binding> _bindings = Collections.emptyList();