summaryrefslogtreecommitdiff
path: root/qpid/java/client/src
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-10-19 11:28:00 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-10-19 11:28:00 +0000
commitff36821da030a42f220a52dda81ca5e411df226c (patch)
treea5fa2e19d6448a16836e8c9a978f311baa277655 /qpid/java/client/src
parentc930c2638c6ab3a9c709bbbddc00c0ba8f0495c4 (diff)
downloadqpid-python-ff36821da030a42f220a52dda81ca5e411df226c.tar.gz
changed to handle async pre-fetch
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@586382 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java19
2 files changed, 20 insertions, 1 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index c81c83223c..8cfc5402bb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -207,7 +207,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
return _acknowledgeMode;
}
- private boolean isMessageListenerSet()
+ protected boolean isMessageListenerSet()
{
return _messageListener.get() != null;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 549acef389..eb414abea5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -53,6 +53,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
* A counter for keeping the number of available messages for this consumer
*/
private final AtomicLong _messageCounter = new AtomicLong(0);
+
+ /**
+ * Number of received message so far
+ */
+ private final AtomicLong _messagesReceived = new AtomicLong(0);
+
/**
* This class logger
*/
@@ -135,6 +141,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
public void onMessage(Message message)
{
+ if( isMessageListenerSet())
+ {
+ _messagesReceived.incrementAndGet();
+ if( _messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH )
+ {
+ // require more credit
+ _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ AMQSession_0_10.MAX_PREFETCH);
+ _messagesReceived.set(0);
+ }
+ }
int channelId = getSession().getChannelId();
long deliveryId = message.getMessageTransferId();
String consumerTag = getConsumerTag().toString();
@@ -417,6 +435,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
_0_10session.getQpidSession().sync();
+ _messagesReceived.set(0);;
}
}
}