summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2008-02-06 16:00:22 +0000
committerArnaud Simon <arnaudsimon@apache.org>2008-02-06 16:00:22 +0000
commita53459d9f89b67d10e2276f58c515c63ea333244 (patch)
tree6bd3349b4f09b84f696d0b267a8cb1682dfa22d7 /java
parent87ba790072b24199633d71666dd12e4b9645505c (diff)
downloadqpid-python-a53459d9f89b67d10e2276f58c515c63ea333244.tar.gz
Added close logic for releasing pre-fetched messages, see QPID-778
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@619043 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java29
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java14
3 files changed, 42 insertions, 3 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 3a59163f9b..b16c2682de 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -195,7 +195,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*
* @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue.
*/
- private final FlowControllingBlockingQueue _queue;
+ protected final FlowControllingBlockingQueue _queue;
/**
* Holds the highest received delivery tag.
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 59b21e69bc..246e27dde0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -27,6 +27,7 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
import org.apache.qpidity.ErrorCode;
@@ -43,6 +44,8 @@ import javax.jms.IllegalStateException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.UUID;
import java.util.Map;
+import java.util.Iterator;
+
/**
* This is a 0.10 Session
*/
@@ -239,6 +242,25 @@ public class AMQSession_0_10 extends AMQSession
}
/**
+ * We need to release message that may be pre-fetched in the local queue
+ *
+ * @throws JMSException
+ */
+ public void close() throws JMSException
+ {
+ super.close();
+ // We need to release pre-fetched messages
+ Iterator messages=_queue.iterator();
+ while (messages.hasNext())
+ {
+ UnprocessedMessage message=(UnprocessedMessage) messages.next();
+ messages.remove();
+ rejectMessage(message, true);
+ }
+ }
+
+
+ /**
* Commit the receipt and the delivery of all messages exchanged by this session resources.
*/
public void sendCommit() throws AMQException, FailoverException
@@ -615,10 +637,13 @@ public class AMQSession_0_10 extends AMQSession
}
}
- void stop() throws AMQException
+
+
+
+ void stop() throws AMQException
{
super.stop();
- for(BasicMessageConsumer c: _consumers.values())
+ for(BasicMessageConsumer c: _consumers.values())
{
c.stop();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 1c02f6a3e4..3534bade61 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -39,6 +39,7 @@ import javax.jms.MessageListener;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.Iterator;
/**
* This is a 0.10 message consumer.
@@ -450,4 +451,17 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
{
_isStarted = false;
}
+
+ public void close() throws JMSException
+ {
+ super.close();
+ // release message that may be staged
+ Iterator messages=_synchronousQueue.iterator();
+ while (messages.hasNext())
+ {
+ AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
+ messages.remove();
+ _session.rejectMessage(message, true);
+ }
+ }
} \ No newline at end of file