summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2008-02-06 15:14:42 +0000
committerArnaud Simon <arnaudsimon@apache.org>2008-02-06 15:14:42 +0000
commit87ba790072b24199633d71666dd12e4b9645505c (patch)
treef8ddfe786eb3beb541211d9ca6742a5781a436dd /java
parentbb6a385a208a594011e5df85764f282abcb04448 (diff)
downloadqpid-python-87ba790072b24199633d71666dd12e4b9645505c.tar.gz
Changed for using Window mode see QPID-778
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@619012 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java29
2 files changed, 2 insertions, 29 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 3ffe92d139..59b21e69bc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -375,7 +375,7 @@ public class AMQSession_0_10 extends AMQSession
consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
- getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
+ getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
// We need to sync so that we get notify of an error.
if(consumer.isStrated())
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 8828f3553f..1c02f6a3e4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -46,10 +46,6 @@ import java.util.concurrent.atomic.AtomicLong;
public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer>
implements org.apache.qpidity.nclient.util.MessageListener
{
- /**
- * Number of received message so far
- */
- private final AtomicLong _messagesReceived = new AtomicLong(0);
/**
* This class logger
@@ -118,7 +114,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
*/
public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId)
{
- _messagesReceived.incrementAndGet();
boolean messageOk = false;
try
{
@@ -143,20 +138,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
}
- /**
- * Require more credit for this consumer
- */
- private void requireMoreCreditIfNecessary()
- {
- if (_isStarted && _messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH)
- {
- // require more credit
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- Session.MESSAGE_FLOW_UNIT_MESSAGE,
- AMQSession_0_10.MAX_PREFETCH);
- _messagesReceived.set(0);
- }
- }
+
/**
* This method is invoked by the transport layer when a message is delivered for this
@@ -239,14 +221,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
{
// notify the session
((AMQSession_0_10) getSession()).addMessageTag(msg.getDeliveryTag());
- if (isMessageListenerSet())
- {
- requireMoreCreditIfNecessary();
- }
- else if (_synchronousQueue.isEmpty())
- {
- requireMoreCreditIfNecessary();
- }
//if (!Boolean.getBoolean("noAck"))
//{
super.postDeliver(msg);
@@ -458,7 +432,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
_0_10session.getQpidSession().sync();
- _messagesReceived.set(0);
}
}
}