diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-10-16 23:43:55 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-10-16 23:43:55 +0000 |
| commit | da7f2b0b1aa16b3051f89013b962cbd63a2da4d7 (patch) | |
| tree | 01558bd167d772d38d6dfc87dc5bb6a54ab7839d | |
| parent | 4fa9b5fc2626b1ed0b82ca3cadf5ea766accef34 (diff) | |
| download | qpid-python-da7f2b0b1aa16b3051f89013b962cbd63a2da4d7.tar.gz | |
There was an issue with the receiveNoWait method.
I modified it to use the getMessageFromQueue(long l) method by passing a -1.
In this case it will use the same logic as the receive(long timeout) method expect that it will not block on the queue when it does a poll
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@585289 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 37 insertions, 7 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 84bad2e487..c81c83223c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -408,7 +408,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me return null; } - Object o = _synchronousQueue.poll(); + Object o = getMessageFromQueue(-1); final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) { @@ -418,6 +418,12 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me return m; } + catch (InterruptedException e) + { + _logger.warn("Interrupted: " + e); + + return null; + } finally { releaseReceiving(); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 51617d57df..8761a08317 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -253,6 +253,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when evaluating message selector", e); } + + System.out.println("---------------------------------------------------------"); + System.out.println("messageOk : " + messageOk + " pre-acquire mode : " + _preAcquire); + System.out.println("---------------------------------------------------------"); + if (_logger.isDebugEnabled()) { _logger.debug("messageOk " + messageOk); @@ -396,9 +401,20 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1); - if (l > 0) + if (l == 0) { - o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); + o = _synchronousQueue.take(); + } + else + { + if (l > 0) + { + o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); + } + else + { + o = _synchronousQueue.poll(); + } if (o == null) { _logger.debug("Message Didn't arrive in time, checking if one is inflight"); @@ -416,10 +432,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } } } - else - { - o = _synchronousQueue.take(); - } return o; } }
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 22ddc886f9..2da8b036f5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -94,6 +94,10 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader { o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); } + if (l < 0) + { + o = _synchronousQueue.poll(); + } else { o = _synchronousQueue.take(); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 454c74b9fd..e43fef5c94 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -67,6 +67,14 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer message.prepareForSending(); org.apache.qpidity.api.Message qpidityMessage = new ByteBufferMessage(); // set the payload + + if(_logger.isDebugEnabled()) + { + _logger.debug("Message Props: " + message.toString()); + } + + //System.out.println("Message Props" + message.toString()); + try { if (message.getData() != null) |
