summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-09-14 20:39:05 +0000
committerRafael H. Schloming <rhs@apache.org>2007-09-14 20:39:05 +0000
commit97a264479653731473dc6ae3389458357c876ff5 (patch)
treee7070833ef33d54c879d8899f903b580d8320624 /java/client/src/main
parent942379b3196bd37817c52e0fd08cb3c0efff234e (diff)
downloadqpid-python-97a264479653731473dc6ae3389458357c876ff5.tar.gz
Merged revision 572751 from the trunk, this fixes QPID-573.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@575788 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java24
1 files changed, 17 insertions, 7 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index bdebc8e50a..140eaa7c1c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -108,6 +108,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
/**
*
@@ -220,6 +221,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private final FlowControllingBlockingQueue _queue;
+ /**
+ * Holds the highest received delivery tag.
+ */
+ private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+
/** Holds the dispatcher thread for this session. */
private Dispatcher _dispatcher;
@@ -1278,6 +1284,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
+ _highestDeliveryTag.set(message.getDeliverBody().deliveryTag);
_queue.add(message);
}
}
@@ -2558,6 +2565,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final Object _lock = new Object();
+ private final AtomicLong _rollbackMark = new AtomicLong(-1);
public Dispatcher()
{
@@ -2614,7 +2622,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
setConnectionStopped(true);
}
- rejectAllMessages(true);
+ _rollbackMark.set(_highestDeliveryTag.get());
_dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
@@ -2650,7 +2658,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// Allow disptacher to start stopped
synchronized (_lock)
{
- while (connectionStopped())
+ while (!_closed.get() && connectionStopped())
{
try
{
@@ -2675,14 +2683,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_lock.wait();
}
- synchronized (_messageDeliveryLock)
+ if (message.getDeliverBody().deliveryTag <= _rollbackMark.get())
{
- dispatchMessage(message);
+ rejectMessage(message, true);
}
-
- while (connectionStopped())
+ else
{
- _lock.wait();
+ synchronized (_messageDeliveryLock)
+ {
+ dispatchMessage(message);
+ }
}
}