diff options
Diffstat (limited to 'qpid/java/client')
3 files changed, 22 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 89597555d4..f72f2bd9f6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -53,7 +53,7 @@ public class AMQSession_0_10 extends AMQSession /** * The maximum number of pre-fetched messages per destination */ - private static final long MAX_PREFETCH = 100; + public static final long MAX_PREFETCH = 100; /** * The underlying QpidSession diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 412c7e9a8a..c801cf48fe 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -51,7 +51,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me /** * The connection being used by this consumer */ - private AMQConnection _connection; + protected AMQConnection _connection; private String _messageSelector; @@ -86,7 +86,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me protected MessageFactoryRegistry _messageFactory; - private final AMQSession _session; + protected final AMQSession _session; protected AMQProtocolHandler _protocolHandler; @@ -354,7 +354,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me return null; } - Object o = null; + Object o ; if (l > 0) { o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 9e0fb54d0f..c612f34116 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -35,6 +35,7 @@ import org.apache.qpidity.filter.MessageFilter; import org.apache.qpidity.filter.JMSSelectorFilter; import javax.jms.JMSException; +import javax.jms.MessageListener; import java.io.IOException; import java.nio.ByteBuffer; @@ -139,7 +140,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By try { ByteBuffer buff = message.readData(); - ByteBuffer newBuf = ByteBuffer.allocate(buff.remaining()) ; + ByteBuffer newBuf = ByteBuffer.allocate(buff.remaining()); newBuf.put(buff); newMessage.receiveBody(newBuf); } @@ -325,4 +326,20 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } return result; } + + + public void setMessageListener(final MessageListener messageListener) throws JMSException + { + super.setMessageListener(messageListener); + if (_connection.started()) + { + _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, + AMQSession_0_10.MAX_PREFETCH); + _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE, + 0xFFFFFFFF); + _0_10session.getQpidSession().sync(); + } + } }
\ No newline at end of file |
