summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-10-16 23:43:55 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-10-16 23:43:55 +0000
commitda7f2b0b1aa16b3051f89013b962cbd63a2da4d7 (patch)
tree01558bd167d772d38d6dfc87dc5bb6a54ab7839d
parent4fa9b5fc2626b1ed0b82ca3cadf5ea766accef34 (diff)
downloadqpid-python-da7f2b0b1aa16b3051f89013b962cbd63a2da4d7.tar.gz
There was an issue with the receiveNoWait method.
I modified it to use the getMessageFromQueue(long l) method by passing a -1. In this case it will use the same logic as the receive(long timeout) method expect that it will not block on the queue when it does a poll git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@585289 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java24
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java8
4 files changed, 37 insertions, 7 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 84bad2e487..c81c83223c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -408,7 +408,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
return null;
}
- Object o = _synchronousQueue.poll();
+ Object o = getMessageFromQueue(-1);
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
@@ -418,6 +418,12 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
return m;
}
+ catch (InterruptedException e)
+ {
+ _logger.warn("Interrupted: " + e);
+
+ return null;
+ }
finally
{
releaseReceiving();
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 51617d57df..8761a08317 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -253,6 +253,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
{
throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when evaluating message selector", e);
}
+
+ System.out.println("---------------------------------------------------------");
+ System.out.println("messageOk : " + messageOk + " pre-acquire mode : " + _preAcquire);
+ System.out.println("---------------------------------------------------------");
+
if (_logger.isDebugEnabled())
{
_logger.debug("messageOk " + messageOk);
@@ -396,9 +401,20 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1);
- if (l > 0)
+ if (l == 0)
{
- o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
+ o = _synchronousQueue.take();
+ }
+ else
+ {
+ if (l > 0)
+ {
+ o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
+ }
+ else
+ {
+ o = _synchronousQueue.poll();
+ }
if (o == null)
{
_logger.debug("Message Didn't arrive in time, checking if one is inflight");
@@ -416,10 +432,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
}
}
- else
- {
- o = _synchronousQueue.take();
- }
return o;
}
} \ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index 22ddc886f9..2da8b036f5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -94,6 +94,10 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader
{
o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
}
+ if (l < 0)
+ {
+ o = _synchronousQueue.poll();
+ }
else
{
o = _synchronousQueue.take();
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 454c74b9fd..e43fef5c94 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -67,6 +67,14 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
message.prepareForSending();
org.apache.qpidity.api.Message qpidityMessage = new ByteBufferMessage();
// set the payload
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Message Props: " + message.toString());
+ }
+
+ //System.out.println("Message Props" + message.toString());
+
try
{
if (message.getData() != null)