summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-10-19 11:28:00 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-10-19 11:28:00 +0000
commitc1626dc5b836f793bf6d478d0354ee4aca95e0d7 (patch)
tree800736ced24863b29e10a6afc5a2b4162269404a /java/client/src
parent620ed093eee287f9ec83d6239b560b2124138841 (diff)
downloadqpid-python-c1626dc5b836f793bf6d478d0354ee4aca95e0d7.tar.gz
changed to handle async pre-fetch
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@586382 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java19
2 files changed, 20 insertions, 1 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index c81c83223c..8cfc5402bb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -207,7 +207,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
return _acknowledgeMode;
}
- private boolean isMessageListenerSet()
+ protected boolean isMessageListenerSet()
{
return _messageListener.get() != null;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 549acef389..eb414abea5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -53,6 +53,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
* A counter for keeping the number of available messages for this consumer
*/
private final AtomicLong _messageCounter = new AtomicLong(0);
+
+ /**
+ * Number of received message so far
+ */
+ private final AtomicLong _messagesReceived = new AtomicLong(0);
+
/**
* This class logger
*/
@@ -135,6 +141,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
public void onMessage(Message message)
{
+ if( isMessageListenerSet())
+ {
+ _messagesReceived.incrementAndGet();
+ if( _messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH )
+ {
+ // require more credit
+ _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ AMQSession_0_10.MAX_PREFETCH);
+ _messagesReceived.set(0);
+ }
+ }
int channelId = getSession().getChannelId();
long deliveryId = message.getMessageTransferId();
String consumerTag = getConsumerTag().toString();
@@ -417,6 +435,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
_0_10session.getQpidSession().sync();
+ _messagesReceived.set(0);;
}
}
}