summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-09-04 17:42:38 +0000
committerRafael H. Schloming <rhs@apache.org>2007-09-04 17:42:38 +0000
commitba9a064e7a85aa1b7d9f78a6e155c0e678b22378 (patch)
tree15ac6cb19bf7fe11208af8880b035bfd782a9c48 /java/client
parent914603f2dbfc30bb005ea9c875243a7edccf41da (diff)
downloadqpid-python-ba9a064e7a85aa1b7d9f78a6e155c0e678b22378.tar.gz
fixed a race condition in rollback() that leads to intermittant failures of TransactedTest, also modified TransactedTest to be slightly more robust
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@572751 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java24
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java41
2 files changed, 37 insertions, 28 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index af469ee291..bcd9337bcc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -109,6 +109,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
/**
*
@@ -221,6 +222,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private final FlowControllingBlockingQueue _queue;
+ /**
+ * Holds the highest received delivery tag.
+ */
+ private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+
/** Holds the dispatcher thread for this session. */
private Dispatcher _dispatcher;
@@ -1281,6 +1287,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
+ _highestDeliveryTag.set(message.getDeliverBody().deliveryTag);
_queue.add(message);
}
}
@@ -2553,6 +2560,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final Object _lock = new Object();
+ private final AtomicLong _rollbackMark = new AtomicLong(-1);
public Dispatcher()
{
@@ -2609,7 +2617,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
setConnectionStopped(true);
}
- rejectAllMessages(true);
+ _rollbackMark.set(_highestDeliveryTag.get());
_dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
@@ -2645,7 +2653,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// Allow disptacher to start stopped
synchronized (_lock)
{
- while (connectionStopped())
+ while (!_closed.get() && connectionStopped())
{
try
{
@@ -2670,14 +2678,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_lock.wait();
}
- synchronized (_messageDeliveryLock)
+ if (message.getDeliverBody().deliveryTag <= _rollbackMark.get())
{
- dispatchMessage(message);
+ rejectMessage(message, true);
}
-
- while (connectionStopped())
+ else
{
- _lock.wait();
+ synchronized (_messageDeliveryLock)
+ {
+ dispatchMessage(message);
+ }
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
index 929621c496..678474a18b 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
@@ -171,34 +171,33 @@ public class TransactedTest extends TestCase
public void testRollback() throws Exception
{
// add some messages
- _logger.info("Send prep A");
- prepProducer1.send(prepSession.createTextMessage("A"));
- _logger.info("Send prep B");
- prepProducer1.send(prepSession.createTextMessage("B"));
- _logger.info("Send prep C");
- prepProducer1.send(prepSession.createTextMessage("C"));
-
- // Quick sleep to ensure all three get pre-fetched
+ _logger.info("Send prep RB_A");
+ prepProducer1.send(prepSession.createTextMessage("RB_A"));
+ _logger.info("Send prep RB_B");
+ prepProducer1.send(prepSession.createTextMessage("RB_B"));
+ _logger.info("Send prep RB_C");
+ prepProducer1.send(prepSession.createTextMessage("RB_C"));
+
+ _logger.info("Sending RB_X RB_Y RB_Z");
+ producer2.send(session.createTextMessage("RB_X"));
+ producer2.send(session.createTextMessage("RB_Y"));
+ producer2.send(session.createTextMessage("RB_Z"));
+ _logger.info("Receiving RB_A RB_B");
+ expect("RB_A", consumer1.receive(1000));
+ expect("RB_B", consumer1.receive(1000));
+ // Don't consume 'RB_C' leave it in the prefetch cache to ensure rollback removes it.
+ // Quick sleep to ensure 'RB_C' gets pre-fetched
Thread.sleep(500);
- _logger.info("Sending X Y Z");
- producer2.send(session.createTextMessage("X"));
- producer2.send(session.createTextMessage("Y"));
- producer2.send(session.createTextMessage("Z"));
- _logger.info("Receiving A B");
- expect("A", consumer1.receive(1000));
- expect("B", consumer1.receive(1000));
- // Don't consume 'C' leave it in the prefetch cache to ensure rollback removes it.
-
// rollback
_logger.info("rollback");
session.rollback();
- _logger.info("Receiving A B C");
+ _logger.info("Receiving RB_A RB_B RB_C");
// ensure sent messages are not visible and received messages are requeued
- expect("A", consumer1.receive(1000), true);
- expect("B", consumer1.receive(1000), true);
- expect("C", consumer1.receive(1000), true);
+ expect("RB_A", consumer1.receive(1000), true);
+ expect("RB_B", consumer1.receive(1000), true);
+ expect("RB_C", consumer1.receive(1000), true);
_logger.info("Starting new connection");
testCon.start();