diff options
| author | Keith Wall <kwall@apache.org> | 2011-10-18 11:20:53 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2011-10-18 11:20:53 +0000 |
| commit | 5a5e04545e7e45977efaf077da21b64f870caf04 (patch) | |
| tree | 86c6788bc7f1db19fb4191eafe5aa4398a449b04 /java/client | |
| parent | b7bc31df25274935c2c00f9afcc1aad37bd93354 (diff) | |
| download | qpid-python-5a5e04545e7e45977efaf077da21b64f870caf04.tar.gz | |
QPID-3542: ensure session complete sent for filtered out messages
Applied patch from Andrew MacBean <andymacbean@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1185580 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 27 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 23 |
2 files changed, 41 insertions, 9 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 3812e612aa..c6a64ec894 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -294,23 +294,34 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - void messageAcknowledge(RangeSet ranges, boolean accept) + void messageAcknowledge(final RangeSet ranges, final boolean accept) { messageAcknowledge(ranges,accept,false); } - void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit) + void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit) { - Session ssn = getQpidSession(); - for (Range range : ranges) + final Session ssn = getQpidSession(); + flushProcessed(ranges,accept); + if (accept) { - ssn.processed(range); + ssn.messageAccept(ranges, UNRELIABLE, setSyncBit ? SYNC : NONE); } - ssn.flushProcessed(accept ? BATCH : NONE); - if (accept) + } + + /** + * Flush any outstanding commands. This causes session complete to be sent. + * @param ranges the range of command ids. + * @param batch true if batched. + */ + void flushProcessed(final RangeSet ranges, final boolean batch) + { + final Session ssn = getQpidSession(); + for (final Range range : ranges) { - ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE); + ssn.processed(range); } + ssn.flushProcessed(batch ? BATCH : NONE); } /** diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 5fba351d8a..548e274571 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -269,8 +269,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { if (_logger.isDebugEnabled()) { - _logger.debug("filterMessage - not ack'ing messaage as not aquired"); + _logger.debug("filterMessage - not ack'ing message as not acquired"); } + flushUnwantedMessage(message); } } @@ -312,6 +313,26 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } /** + * Flush an unwanted message. For 0-10 we need to ensure that all messages are indicated + * processed to ensure their AMQP command-id is marked completed. + * + * @param message The unwanted message to be flushed + * @throws AMQException If the unwanted message cannot be flushed due to some internal error. + */ + private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException + { + final RangeSet ranges = new RangeSet(); + ranges.add((int) message.getDeliveryTag()); + _0_10session.flushProcessed(ranges,false); + + final AMQException amqe = _0_10session.getCurrentException(); + if (amqe != null) + { + throw amqe; + } + } + + /** * Acquire a message * * @param message The message to be acquired |
