From 371bf976678df3ffba6ebc14a7ff0ed676097ce9 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Thu, 6 Oct 2011 16:40:43 +0000 Subject: QPID-3527: update handling of auto-ack messages for 0-10 to send acks (asynchronously) on a per-message basis rather than batching for 1 second, update handling for other ack modes to be clearer with respect to 0-8/0-10 behavioural differences. Remove some redundant methods from AMQSession, updating handling of 'no consume'/'isBrowseOnly' such that BasicMessageConsumer is always supplied a single consistent answer for whether it is non-consuming or not. Applied patch from Oleksandr Rudyy and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1179698 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQDestination.java | 2 +- .../java/org/apache/qpid/client/AMQSession.java | 64 +++++++++------------- .../org/apache/qpid/client/AMQSession_0_10.java | 2 +- .../apache/qpid/client/BasicMessageConsumer.java | 41 ++++++-------- .../qpid/client/BasicMessageConsumer_0_10.java | 23 +++----- .../client/message/AMQMessageDelegate_0_10.java | 1 - 6 files changed, 53 insertions(+), 80 deletions(-) (limited to 'java') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 3ef32fb008..acd46da11a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -793,7 +793,7 @@ public abstract class AMQDestination implements Destination, Referenceable return _browseOnly; } - public void setBrowseOnly(boolean b) + private void setBrowseOnly(boolean b) { _browseOnly = b; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index a41f2f9b17..a477832892 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -952,7 +952,7 @@ public abstract class AMQSession(topic, + createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, false, true, null, null, false, false)); } /** @@ -1457,10 +1433,11 @@ public abstract class AMQSession(topic, + createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, noLocal, + true, messageSelector, null, false, false)); } public TemporaryQueue createTemporaryQueue() throws JMSException @@ -1985,6 +1962,12 @@ public abstract class AMQSession= prefetch/2 || maxAckDelay <= 0) + if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || _acknowledgeMode == javax.jms.Session.AUTO_ACKNOWLEDGE) { flushAcknowledgments(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index e6e1398a35..3b807591b0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -734,34 +734,27 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa case Session.PRE_ACKNOWLEDGE: _session.acknowledgeMessage(msg.getDeliveryTag(), false); break; + case Session.AUTO_ACKNOWLEDGE: + //fall through + case Session.DUPS_OK_ACKNOWLEDGE: + _session.addUnacknowledgedMessage(msg.getDeliveryTag()); + break; case Session.CLIENT_ACKNOWLEDGE: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } - else - { - // we set the session so that when the user calls acknowledge() it can call the method on session - // to send out the appropriate frame - msg.setAMQSession(_session); - _session.addUnacknowledgedMessage(msg.getDeliveryTag()); - _session.markDirty(); - } + // we set the session so that when the user calls acknowledge() it can call the method on session + // to send out the appropriate frame + msg.setAMQSession(_session); + _session.addUnacknowledgedMessage(msg.getDeliveryTag()); + _session.markDirty(); break; case Session.SESSION_TRANSACTED: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } - else - { - _session.addDeliveredMessage(msg.getDeliveryTag()); - _session.markDirty(); - } - + _session.addDeliveredMessage(msg.getDeliveryTag()); + _session.markDirty(); + break; + case Session.NO_ACKNOWLEDGE: + //do nothing. + //path used for NO-ACK consumers, and browsers (see constructor). break; } - } void postDeliver(AbstractJMSMessage msg) @@ -883,7 +876,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa public boolean isNoConsume() { - return _noConsume || _destination.isBrowseOnly() ; + return _noConsume; } public void rollback() diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index d3494298d3..3c24c67f9b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -65,11 +65,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer