diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-01 12:06:17 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-01 12:06:17 +0000 |
| commit | 6ad1e3aba34346fb9cd3b3f0b1c5a4b215b73cc4 (patch) | |
| tree | 57babbf0adef5682a4a1395c502a0d8de9595336 /qpid/java/client | |
| parent | 84b370ef5cc0741770d2f715d7517ddc351ec5fd (diff) | |
| download | qpid-python-6ad1e3aba34346fb9cd3b3f0b1c5a4b215b73cc4.tar.gz | |
Changed for setting message flow to already started message listeners
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@580929 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
3 files changed, 22 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 89597555d4..f72f2bd9f6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -53,7 +53,7 @@ public class AMQSession_0_10 extends AMQSession /** * The maximum number of pre-fetched messages per destination */ - private static final long MAX_PREFETCH = 100; + public static final long MAX_PREFETCH = 100; /** * The underlying QpidSession diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 412c7e9a8a..c801cf48fe 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -51,7 +51,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me /** * The connection being used by this consumer */ - private AMQConnection _connection; + protected AMQConnection _connection; private String _messageSelector; @@ -86,7 +86,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me protected MessageFactoryRegistry _messageFactory; - private final AMQSession _session; + protected final AMQSession _session; protected AMQProtocolHandler _protocolHandler; @@ -354,7 +354,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me return null; } - Object o = null; + Object o ; if (l > 0) { o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 9e0fb54d0f..c612f34116 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -35,6 +35,7 @@ import org.apache.qpidity.filter.MessageFilter; import org.apache.qpidity.filter.JMSSelectorFilter; import javax.jms.JMSException; +import javax.jms.MessageListener; import java.io.IOException; import java.nio.ByteBuffer; @@ -139,7 +140,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By try { ByteBuffer buff = message.readData(); - ByteBuffer newBuf = ByteBuffer.allocate(buff.remaining()) ; + ByteBuffer newBuf = ByteBuffer.allocate(buff.remaining()); newBuf.put(buff); newMessage.receiveBody(newBuf); } @@ -325,4 +326,20 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } return result; } + + + public void setMessageListener(final MessageListener messageListener) throws JMSException + { + super.setMessageListener(messageListener); + if (_connection.started()) + { + _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, + AMQSession_0_10.MAX_PREFETCH); + _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE, + 0xFFFFFFFF); + _0_10session.getQpidSession().sync(); + } + } }
\ No newline at end of file |
