diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2008-02-06 15:14:42 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2008-02-06 15:14:42 +0000 |
| commit | 87ba790072b24199633d71666dd12e4b9645505c (patch) | |
| tree | f8ddfe786eb3beb541211d9ca6742a5781a436dd /java | |
| parent | bb6a385a208a594011e5df85764f282abcb04448 (diff) | |
| download | qpid-python-87ba790072b24199633d71666dd12e4b9645505c.tar.gz | |
Changed for using Window mode see QPID-778
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@619012 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 2 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 29 |
2 files changed, 2 insertions, 29 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 3ffe92d139..59b21e69bc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -375,7 +375,7 @@ public class AMQSession_0_10 extends AMQSession consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); - getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT); + getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW); getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); // We need to sync so that we get notify of an error. if(consumer.isStrated()) diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 8828f3553f..1c02f6a3e4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -46,10 +46,6 @@ import java.util.concurrent.atomic.AtomicLong; public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer> implements org.apache.qpidity.nclient.util.MessageListener { - /** - * Number of received message so far - */ - private final AtomicLong _messagesReceived = new AtomicLong(0); /** * This class logger @@ -118,7 +114,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By */ public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId) { - _messagesReceived.incrementAndGet(); boolean messageOk = false; try { @@ -143,20 +138,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } } - /** - * Require more credit for this consumer - */ - private void requireMoreCreditIfNecessary() - { - if (_isStarted && _messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH) - { - // require more credit - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), - Session.MESSAGE_FLOW_UNIT_MESSAGE, - AMQSession_0_10.MAX_PREFETCH); - _messagesReceived.set(0); - } - } + /** * This method is invoked by the transport layer when a message is delivered for this @@ -239,14 +221,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { // notify the session ((AMQSession_0_10) getSession()).addMessageTag(msg.getDeliveryTag()); - if (isMessageListenerSet()) - { - requireMoreCreditIfNecessary(); - } - else if (_synchronousQueue.isEmpty()) - { - requireMoreCreditIfNecessary(); - } //if (!Boolean.getBoolean("noAck")) //{ super.postDeliver(msg); @@ -458,7 +432,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); _0_10session.getQpidSession().sync(); - _messagesReceived.set(0); } } } |
