diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2008-06-03 15:25:36 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2008-06-03 15:25:36 +0000 |
| commit | 244909ffb0d10ec1ac394c80d1d57d574503ca17 (patch) | |
| tree | 78b05e83590aaf8ecded23dee5cd6b92b20da492 /java | |
| parent | 888e7c269275207be3463fcf2ea109db80bf86e5 (diff) | |
| download | qpid-python-244909ffb0d10ec1ac394c80d1d57d574503ca17.tar.gz | |
QPID-1112: Update previous commit by re-using messageAcknowledge (added a flag specifying whether to send an messageAccept)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@662827 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
5 files changed, 22 insertions, 46 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index b9094db5bd..0753ee539a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -160,7 +160,7 @@ public class AMQSession_0_10 extends AMQSession ranges.add((int) deliveryTag); _unacknowledgedMessageTags.remove(deliveryTag); } - getQpidSession().messageAcknowledge(ranges); + getQpidSession().messageAcknowledge(ranges, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); } /** diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index c47aee0410..ae597b1703 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -24,10 +24,12 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; +import org.apache.qpid.jms.*; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpidity.api.Message; import org.apache.qpidity.transport.*; +import org.apache.qpidity.transport.Session; import org.apache.qpidity.QpidException; import org.apache.qpidity.filter.MessageFilter; import org.apache.qpidity.filter.JMSSelectorFilter; @@ -77,12 +79,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By */ private final AtomicBoolean _syncReceive = new AtomicBoolean(false); - /** - * Used for no-ack mode so to send session completion command - */ - private int _numberReceivedMessages = 0; - private int _firstMessageToComplete; - //--- constructor protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, @@ -165,25 +161,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By */ public void onMessage(Message message) { - /** - * For no-ack mode - */ - if( _acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE ) - { - _numberReceivedMessages++; - if(_numberReceivedMessages == 1) - { - _firstMessageToComplete = message.getMessageTransferId(); - } - if(_numberReceivedMessages >= getSession().getAMQConnection().getMaxPrefetch() ) - { - RangeSet r = new RangeSet(); - r.add(_firstMessageToComplete, message.getMessageTransferId()); - _0_10session.getQpidSession().sessionCompleted(r, Option.TIMELY_REPLY); - _numberReceivedMessages = 0; - } - } - int channelId = getSession().getChannelId(); long deliveryId = message.getMessageTransferId(); AMQShortString consumerTag = getConsumerTag(); @@ -383,7 +360,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { RangeSet ranges = new RangeSet(); ranges.add((int) message.getDeliveryTag()); - _0_10session.getQpidSession().messageAcknowledge(ranges); + _0_10session.getQpidSession().messageAcknowledge(ranges, + _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE ); _0_10session.getCurrentException(); } } @@ -499,4 +477,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } return o; } + + void postDeliver(AbstractJMSMessage msg) throws JMSException + { + super.postDeliver(msg); + if(_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + } } diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java index 28218e01d6..833a26da87 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java @@ -65,16 +65,6 @@ public interface Session public void sessionDetach(byte[] name); - /** - * This control is sent by the receiver of commands, and handled by the sender - * of commands. It informs the sender of all commands completed by the receiver. - * This excludes commands known by the receiver to be considered complete at the sender. - * - * @param commands completed commands. - * @param options {@link Option#TIMELY_REPLY} If set, the sender is no longer free to delay the known-completed reply. - */ - public void sessionCompleted(RangeSet commands, Option... options); - public void sessionRequestTimeout(long expiry); public byte[] getName(); @@ -328,8 +318,9 @@ public interface Session * pre-acquire mode or by explicitly acquiring them. * * @param ranges Range of messages to be acknowledged. + * @param accept pecify whether to send a message accept to the broker */ - public void messageAcknowledge(RangeSet ranges); + public void messageAcknowledge(RangeSet ranges, boolean accept); /** * Reject a range of acquired messages. diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java index 58ffffb12b..0c0341490a 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java @@ -59,14 +59,17 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen super(name); } - public void messageAcknowledge(RangeSet ranges) + public void messageAcknowledge(RangeSet ranges, boolean accept) { for (Range range : ranges) { super.processed(range); } super.flushProcessed(); - messageAccept(ranges); + if( accept ) + { + messageAccept(ranges); + } } public void messageSubscribe(String queue, String destination, short acceptMode, short acquireMode, MessagePartListener listener, Map<String, Object> filter, Option... options) @@ -105,11 +108,6 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen _currentDataSizeNotSynced = 0; } - public void sessionCompleted(RangeSet commands, Option ... options) - { - super.sessionCompleted(commands, options); - } - /* ------------------------- * Data methods * ------------------------*/ diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java b/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java index 0a15189b48..e452091622 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java @@ -108,7 +108,7 @@ public class BasicInteropTest implements ClosedListener System.out.println("--------/Message Received--------"); RangeSet ack = new RangeSet(); ack.add(message.getMessageTransferId(),message.getMessageTransferId()); - session.messageAcknowledge(ack); + session.messageAcknowledge(ack, true); } }), |
