diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2008-02-06 16:00:22 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2008-02-06 16:00:22 +0000 |
| commit | a53459d9f89b67d10e2276f58c515c63ea333244 (patch) | |
| tree | 6bd3349b4f09b84f696d0b267a8cb1682dfa22d7 /java | |
| parent | 87ba790072b24199633d71666dd12e4b9645505c (diff) | |
| download | qpid-python-a53459d9f89b67d10e2276f58c515c63ea333244.tar.gz | |
Added close logic for releasing pre-fetched messages, see QPID-778
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@619043 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
3 files changed, 42 insertions, 3 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 3a59163f9b..b16c2682de 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -195,7 +195,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue. */ - private final FlowControllingBlockingQueue _queue; + protected final FlowControllingBlockingQueue _queue; /** * Holds the highest received delivery tag. diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 59b21e69bc..246e27dde0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -27,6 +27,7 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.FiledTableSupport; +import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpidity.nclient.Session; import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; import org.apache.qpidity.ErrorCode; @@ -43,6 +44,8 @@ import javax.jms.IllegalStateException; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.UUID; import java.util.Map; +import java.util.Iterator; + /** * This is a 0.10 Session */ @@ -239,6 +242,25 @@ public class AMQSession_0_10 extends AMQSession } /** + * We need to release message that may be pre-fetched in the local queue + * + * @throws JMSException + */ + public void close() throws JMSException + { + super.close(); + // We need to release pre-fetched messages + Iterator messages=_queue.iterator(); + while (messages.hasNext()) + { + UnprocessedMessage message=(UnprocessedMessage) messages.next(); + messages.remove(); + rejectMessage(message, true); + } + } + + + /** * Commit the receipt and the delivery of all messages exchanged by this session resources. */ public void sendCommit() throws AMQException, FailoverException @@ -615,10 +637,13 @@ public class AMQSession_0_10 extends AMQSession } } - void stop() throws AMQException + + + + void stop() throws AMQException { super.stop(); - for(BasicMessageConsumer c: _consumers.values()) + for(BasicMessageConsumer c: _consumers.values()) { c.stop(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 1c02f6a3e4..3534bade61 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -39,6 +39,7 @@ import javax.jms.MessageListener; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicLong; +import java.util.Iterator; /** * This is a 0.10 message consumer. @@ -450,4 +451,17 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { _isStarted = false; } + + public void close() throws JMSException + { + super.close(); + // release message that may be staged + Iterator messages=_synchronousQueue.iterator(); + while (messages.hasNext()) + { + AbstractJMSMessage message=(AbstractJMSMessage) messages.next(); + messages.remove(); + _session.rejectMessage(message, true); + } + } }
\ No newline at end of file |
