From c15d7a342bb41b9c75f590e4a5050a5e0717a03c Mon Sep 17 00:00:00 2001 From: Arnaud Simon Date: Wed, 6 Feb 2008 16:00:22 +0000 Subject: Added close logic for releasing pre-fetched messages, see QPID-778 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@619043 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 2 +- .../org/apache/qpid/client/AMQSession_0_10.java | 29 ++++++++++++++++++++-- .../qpid/client/BasicMessageConsumer_0_10.java | 14 +++++++++++ 3 files changed, 42 insertions(+), 3 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 3a59163f9b..b16c2682de 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -195,7 +195,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue. */ - private final FlowControllingBlockingQueue _queue; + protected final FlowControllingBlockingQueue _queue; /** * Holds the highest received delivery tag. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 59b21e69bc..246e27dde0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -27,6 +27,7 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.FiledTableSupport; +import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpidity.nclient.Session; import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; import org.apache.qpidity.ErrorCode; @@ -43,6 +44,8 @@ import javax.jms.IllegalStateException; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.UUID; import java.util.Map; +import java.util.Iterator; + /** * This is a 0.10 Session */ @@ -238,6 +241,25 @@ public class AMQSession_0_10 extends AMQSession getCurrentException(); } + /** + * We need to release message that may be pre-fetched in the local queue + * + * @throws JMSException + */ + public void close() throws JMSException + { + super.close(); + // We need to release pre-fetched messages + Iterator messages=_queue.iterator(); + while (messages.hasNext()) + { + UnprocessedMessage message=(UnprocessedMessage) messages.next(); + messages.remove(); + rejectMessage(message, true); + } + } + + /** * Commit the receipt and the delivery of all messages exchanged by this session resources. */ @@ -615,10 +637,13 @@ public class AMQSession_0_10 extends AMQSession } } - void stop() throws AMQException + + + + void stop() throws AMQException { super.stop(); - for(BasicMessageConsumer c: _consumers.values()) + for(BasicMessageConsumer c: _consumers.values()) { c.stop(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 1c02f6a3e4..3534bade61 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -39,6 +39,7 @@ import javax.jms.MessageListener; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicLong; +import java.util.Iterator; /** * This is a 0.10 message consumer. @@ -450,4 +451,17 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer