diff options
Diffstat (limited to 'qpid/java')
6 files changed, 97 insertions, 33 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index f89b3715f3..5850c08b9b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -344,7 +344,7 @@ public class AMQSession_0_10 extends AMQSession new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null, consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); - getQpidSession().messageFlowMode(tag.toString(), Session.MESSAGE_FLOW_MODE_WINDOW); + // We need to sync so that we get notify of an error. getQpidSession().sync(); getCurrentException(); @@ -437,17 +437,30 @@ public class AMQSession_0_10 extends AMQSession for (BasicMessageConsumer consumer : _consumers.values()) { getQpidSession().messageStop(consumer.getConsumerTag().toString()); + getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT); + } } else { for (BasicMessageConsumer consumer : _consumers.values()) { - getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE, - MAX_PREFETCH); - // todo this - getQpidSession() - .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); + //only set if msg list is null + try + { + if (consumer.getMessageListener() != null) + { + getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE, + MAX_PREFETCH); + // todo this + getQpidSession() + .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); + } + } + catch(Exception e) + { + throw new AMQException(AMQConstant.INTERNAL_ERROR,"Error while trying to get the listener",e); + } } } // We need to sync so that we get notify of an error. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index ca6312e79f..fa85afc6e8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -82,7 +82,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors * <p/> Argument true indicates we want strict FIFO semantics */ - private final ArrayBlockingQueue _synchronousQueue; + protected final ArrayBlockingQueue _synchronousQueue; protected MessageFactoryRegistry _messageFactory; @@ -354,15 +354,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me return null; } - Object o ; - if (l > 0) - { - o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); - } - else - { - o = _synchronousQueue.take(); - } + Object o = getMessageFromQueue(l); final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) @@ -385,6 +377,8 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me } } + public abstract Object getMessageFromQueue(long l) throws InterruptedException; + private boolean closeOnAutoClose() throws JMSException { if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty()) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index f1b8c2d3e7..b0cd749a2a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -29,6 +29,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpidity.api.Message; +import org.apache.qpidity.nclient.Session; import org.apache.qpidity.transport.*; import org.apache.qpidity.QpidException; import org.apache.qpidity.filter.MessageFilter; @@ -38,6 +39,7 @@ import javax.jms.JMSException; import javax.jms.MessageListener; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; /** * This is a 0.10 message consumer. @@ -65,7 +67,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By */ private boolean _preAcquire = true; - //--- constructor + //--- constructor protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, @@ -127,7 +129,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By public void onMessage(Message message) - { + { int channelId = getSession().getChannelId(); long deliveryId = message.getMessageTransferId(); String consumerTag = getConsumerTag().toString(); @@ -215,7 +217,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException { boolean messageOk = true; - // TODO Use a tag for fiding out if message filtering is done here or by the broker. + // TODO Use a tag for fiding out if message filtering is done here or by the broker. try { if (getMessageSelector() != null) @@ -334,15 +336,45 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By public void setMessageListener(final MessageListener messageListener) throws JMSException { super.setMessageListener(messageListener); - if (_connection.started()) + if (messageListener == null) + { + _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT); + _0_10session.getQpidSession().messageStop(getConsumerTag().toString()); + _0_10session.getQpidSession().sync(); + } + else + { + if (_connection.started()) + { + _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW); + _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, + AMQSession_0_10.MAX_PREFETCH); + _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE, + 0xFFFFFFFF); + _0_10session.getQpidSession().sync(); + } + } + } + + public Object getMessageFromQueue(long l) throws InterruptedException + { + Object o; + _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1); + + if (l > 0) { - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), - org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, - AMQSession_0_10.MAX_PREFETCH); - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), - org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE, - 0xFFFFFFFF); + o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); + _0_10session.getQpidSession().messageFlush(getConsumerTag().toString()); _0_10session.getQpidSession().sync(); + o = _synchronousQueue.poll(); + } + else + { + o = _synchronousQueue.take(); } + return null; } }
\ No newline at end of file diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index c82ed96e4c..1bca13ac02 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client; +import java.util.concurrent.TimeUnit; + import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AbstractJMSMessage; @@ -84,4 +86,18 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); } + + public Object getMessageFromQueue(long l) throws InterruptedException + { + Object o; + if (l > 0) + { + o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); + } + else + { + o = _synchronousQueue.take(); + } + return null; + } }
\ No newline at end of file diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java index 993b0870aa..4ea8df3273 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java @@ -147,6 +147,5 @@ public class BasicInteropTest implements ClosedListener t.testSendMessage(); t.testMessageFlush(); t.close(); - } } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java b/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java index 7f55dcbd67..9baade610b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -28,6 +28,7 @@ import javax.jms.Queue; import org.apache.qpidity.QpidException; import org.apache.qpidity.nclient.MessagePartListener; +import org.apache.qpidity.nclient.Session; import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; import org.apache.qpidity.exchange.ExchangeDefaults; import org.apache.qpidity.filter.JMSSelectorFilter; @@ -159,7 +160,7 @@ public class MessageConsumerImpl extends MessageActor // bind this queue with the topic exchange getSession().getQpidSession() .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getRoutingKey(), null); - // subscribe to this topic + // subscribe to this topic getSession().getQpidSession() .messageSubscribe(queueName, getMessageActorID(), org.apache.qpidity.nclient.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, @@ -174,12 +175,21 @@ public class MessageConsumerImpl extends MessageActor // set the flow mode getSession().getQpidSession() .messageFlowMode(getMessageActorID(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_MODE_CREDIT); + // Set unlimited byte credits + getSession().getQpidSession().messageFlow(getMessageActorID(), Session.MESSAGE_FLOW_UNIT_BYTE, -1); // this will prevent the broker from sending more than one message // When a messageListener is set the flow will be adjusted. // until then we assume it's for synchronous message consumption requestCredit(1); - requestSync(); + try + { + requestSync(); + } + catch(Exception e) + { + e.printStackTrace(); + } // check for an exception if (getSession().getCurrentException() != null) { |
