diff options
Diffstat (limited to 'java/client/src')
4 files changed, 37 insertions, 7 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 84bad2e487..c81c83223c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -408,7 +408,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me return null; } - Object o = _synchronousQueue.poll(); + Object o = getMessageFromQueue(-1); final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) { @@ -418,6 +418,12 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me return m; } + catch (InterruptedException e) + { + _logger.warn("Interrupted: " + e); + + return null; + } finally { releaseReceiving(); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 51617d57df..8761a08317 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -253,6 +253,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when evaluating message selector", e); } + + System.out.println("---------------------------------------------------------"); + System.out.println("messageOk : " + messageOk + " pre-acquire mode : " + _preAcquire); + System.out.println("---------------------------------------------------------"); + if (_logger.isDebugEnabled()) { _logger.debug("messageOk " + messageOk); @@ -396,9 +401,20 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1); - if (l > 0) + if (l == 0) { - o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); + o = _synchronousQueue.take(); + } + else + { + if (l > 0) + { + o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); + } + else + { + o = _synchronousQueue.poll(); + } if (o == null) { _logger.debug("Message Didn't arrive in time, checking if one is inflight"); @@ -416,10 +432,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } } } - else - { - o = _synchronousQueue.take(); - } return o; } }
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 22ddc886f9..2da8b036f5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -94,6 +94,10 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader { o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); } + if (l < 0) + { + o = _synchronousQueue.poll(); + } else { o = _synchronousQueue.take(); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 454c74b9fd..e43fef5c94 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -67,6 +67,14 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer message.prepareForSending(); org.apache.qpidity.api.Message qpidityMessage = new ByteBufferMessage(); // set the payload + + if(_logger.isDebugEnabled()) + { + _logger.debug("Message Props: " + message.toString()); + } + + //System.out.println("Message Props" + message.toString()); + try { if (message.getData() != null) |
