diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-17 14:09:46 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-17 14:09:46 +0000 |
| commit | af0b219374ba8257dbc47f907cba2c19c4004b53 (patch) | |
| tree | 52d8c4f84158e421066c19c7397846cf18d362e9 | |
| parent | 69aabf18f7ce2377e411bddb8fb2297e2254b280 (diff) | |
| download | qpid-python-af0b219374ba8257dbc47f907cba2c19c4004b53.tar.gz | |
Cahnged flow control for message selector
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@585514 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 36 |
1 files changed, 27 insertions, 9 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 58b84910d4..bfdaa66618 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -298,12 +298,24 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By // the current message received is not good, so we need to get a message. if (getMessageListener() == null) { + int oldval = _messageCounter.intValue(); _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1); _0_10session.getQpidSession().messageFlush(getConsumerTag().toString()); _0_10session.getQpidSession().sync(); _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); + if( _messageCounter.intValue() <= oldval ) + { + // we haven't received a message so tell the receiver to return null + _synchronousQueue.add(new NullTocken()); + } + else + { + _messageCounter.decrementAndGet(); + } } + // we now need to check if we have received a message + } catch(Exception e) { @@ -378,11 +390,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By return result; } - void preDeliver(AbstractJMSMessage msg) - { - _messageCounter.decrementAndGet(); - super.preDeliver(msg); - } public void setMessageListener(final MessageListener messageListener) throws JMSException { @@ -443,12 +450,23 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { o = _synchronousQueue.take(); } - else - { - System.out.println("null"); - } } } + if( o instanceof NullTocken ) + { + o = null; + } return o; } + + protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException + { + _messageCounter.decrementAndGet(); + super.preApplicationProcessing(jmsMsg); + } + + private class NullTocken + { + + } }
\ No newline at end of file |
