summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-10-01 12:06:17 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-10-01 12:06:17 +0000
commit6ad1e3aba34346fb9cd3b3f0b1c5a4b215b73cc4 (patch)
tree57babbf0adef5682a4a1395c502a0d8de9595336 /qpid/java/client
parent84b370ef5cc0741770d2f715d7517ddc351ec5fd (diff)
downloadqpid-python-6ad1e3aba34346fb9cd3b3f0b1c5a4b215b73cc4.tar.gz
Changed for setting message flow to already started message listeners
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@580929 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java19
3 files changed, 22 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 89597555d4..f72f2bd9f6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -53,7 +53,7 @@ public class AMQSession_0_10 extends AMQSession
/**
* The maximum number of pre-fetched messages per destination
*/
- private static final long MAX_PREFETCH = 100;
+ public static final long MAX_PREFETCH = 100;
/**
* The underlying QpidSession
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 412c7e9a8a..c801cf48fe 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -51,7 +51,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
/**
* The connection being used by this consumer
*/
- private AMQConnection _connection;
+ protected AMQConnection _connection;
private String _messageSelector;
@@ -86,7 +86,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
protected MessageFactoryRegistry _messageFactory;
- private final AMQSession _session;
+ protected final AMQSession _session;
protected AMQProtocolHandler _protocolHandler;
@@ -354,7 +354,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
return null;
}
- Object o = null;
+ Object o ;
if (l > 0)
{
o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 9e0fb54d0f..c612f34116 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -35,6 +35,7 @@ import org.apache.qpidity.filter.MessageFilter;
import org.apache.qpidity.filter.JMSSelectorFilter;
import javax.jms.JMSException;
+import javax.jms.MessageListener;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -139,7 +140,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
try
{
ByteBuffer buff = message.readData();
- ByteBuffer newBuf = ByteBuffer.allocate(buff.remaining()) ;
+ ByteBuffer newBuf = ByteBuffer.allocate(buff.remaining());
newBuf.put(buff);
newMessage.receiveBody(newBuf);
}
@@ -325,4 +326,20 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
return result;
}
+
+
+ public void setMessageListener(final MessageListener messageListener) throws JMSException
+ {
+ super.setMessageListener(messageListener);
+ if (_connection.started())
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ AMQSession_0_10.MAX_PREFETCH);
+ _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
+ 0xFFFFFFFF);
+ _0_10session.getQpidSession().sync();
+ }
+ }
} \ No newline at end of file