summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java63
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java11
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java164
4 files changed, 191 insertions, 50 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 2e3e417c95..5c565989d7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -91,6 +91,7 @@ import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.url.AMQBindingURL;
+import org.apache.mina.common.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1559,6 +1560,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
suspendChannel(true);
}
+ // Let the dispatcher know that all the incomming messages
+ // should be rolled back(reject/release)
+ _rollbackMark.set(_highestDeliveryTag.get());
+
+ syncDispatchQueue();
+
+ _dispatcher.rollback();
+
releaseForRollback();
sendRollback();
@@ -1851,26 +1860,58 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
void failoverPrep()
{
- startDispatcherIfNecessary();
syncDispatchQueue();
}
void syncDispatchQueue()
{
- final CountDownLatch signal = new CountDownLatch(1);
- _queue.add(new Dispatchable() {
- public void dispatch(AMQSession ssn)
+ if (Thread.currentThread() == _dispatcherThread)
+ {
+ while (!_closed.get() && !_queue.isEmpty())
{
- signal.countDown();
+ Dispatchable disp;
+ try
+ {
+ disp = (Dispatchable) _queue.take();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ // Check just in case _queue becomes empty, it shouldn't but
+ // better than an NPE.
+ if (disp == null)
+ {
+ _logger.debug("_queue became empty during sync.");
+ break;
+ }
+
+ disp.dispatch(AMQSession.this);
}
- });
- try
- {
- signal.await();
}
- catch (InterruptedException e)
+ else
{
- throw new RuntimeException(e);
+ startDispatcherIfNecessary();
+
+ final CountDownLatch signal = new CountDownLatch(1);
+
+ _queue.add(new Dispatchable()
+ {
+ public void dispatch(AMQSession ssn)
+ {
+ signal.countDown();
+ }
+ });
+
+ try
+ {
+ signal.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 0644bd88a8..1587d6a6bf 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -414,9 +414,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void releaseForRollback()
{
- startDispatcherIfNecessary();
- syncDispatchQueue();
- _dispatcher.rollback();
getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
_txRangeSet.clear();
_txSize = 0;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index d7196c0abb..bc1453beaf 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -195,6 +195,12 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
public void releaseForRollback()
{
+ // Reject all the messages that have been received in this session and
+ // have not yet been acknowledged. Should look to remove
+ // _deliveredMessageTags and use _txRangeSet as used by 0-10.
+ // Otherwise messages will be able to arrive out of order to a second
+ // consumer on the queue. Whilst this is within the JMS spec it is not
+ // user friendly and avoidable.
while (true)
{
Long tag = _deliveredMessageTags.poll();
@@ -205,11 +211,6 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
rejectMessage(tag, true);
}
-
- if (_dispatcher != null)
- {
- _dispatcher.rollback();
- }
}
public void rejectMessage(long deliveryTag, boolean requeue)
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
index 39e2b892a9..2efe93eed8 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
@@ -22,64 +22,166 @@ package org.apache.qpid.test.client;
import org.apache.qpid.test.utils.*;
import javax.jms.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import junit.framework.ComparisonFailure;
+import junit.framework.AssertionFailedError;
/**
- * RollbackOrderTest
+ * RollbackOrderTest, QPID-1864, QPID-1871
+ *
+ * Description:
+ *
+ * The problem that this test is exposing is that the dispatcher used to be capable
+ * of holding on to a message when stopped. This ment that when the rollback was
+ * called and the dispatcher stopped it may have hold of a message. So after all
+ * the local queues(preDeliveryQueue, SynchronousQueue, PostDeliveryTagQueue)
+ * have been cleared the client still had a single message, the one the
+ * dispatcher was holding on to.
+ *
+ * As a result the TxRollback operation would run and then release the dispatcher.
+ * Whilst the dispatcher would then proceed to reject the message it was holiding
+ * the Broker would already have resent that message so the rejection would silently
+ * fail.
+ *
+ * And the client would receieve that single message 'early', depending on the
+ * number of messages already recevied when rollback was called.
+ *
+ *
+ * Aims:
+ *
+ * The tests puts 50 messages on to the queue.
+ *
+ * The test then tries to cause the dispatcher to stop whilst it is in the process
+ * of moving a message from the preDeliveryQueue to a consumers sychronousQueue.
+ *
+ * To exercise this path we have 50 message flowing to the client to give the
+ * dispatcher a bit of work to do moving messages.
+ *
+ * Then we loop - 10 times
+ * - Validating that the first message received is always message 1.
+ * - Receive a few more so that there are a few messages to reject.
+ * - call rollback, to try and catch the dispatcher mid process.
+ *
+ * Outcome:
+ *
+ * The hope is that we catch the dispatcher mid process and cause a BasicReject
+ * to fail. Which will be indicated in the log but will also cause that failed
+ * rejected message to be the next to be delivered which will not be message 1
+ * as expected.
+ *
+ * We are testing a race condition here but we can check through the log file if
+ * the race condition occured. However, performing that check will only validate
+ * the problem exists and will not be suitable as part of a system test.
*
*/
-
public class RollbackOrderTest extends QpidTestCase
{
- private Connection conn;
- private Queue queue;
- private Session ssn;
- private MessageProducer prod;
- private MessageConsumer cons;
+ private Connection _connection;
+ private Queue _queue;
+ private Session _session;
+ private MessageConsumer _consumer;
@Override public void setUp() throws Exception
{
super.setUp();
- conn = getConnection();
- conn.start();
- ssn = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
- queue = ssn.createQueue("rollback-order-test-queue");
- prod = ssn.createProducer(queue);
- cons = ssn.createConsumer(queue);
- for (int i = 0; i < 5; i++)
- {
- TextMessage msg = ssn.createTextMessage("message " + (i+1));
- prod.send(msg);
- }
- ssn.commit();
+ _connection = getConnection();
+
+ _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ _queue = _session.createQueue(getTestQueueName());
+ _consumer = _session.createConsumer(_queue);
+
+ //Send more messages so it is more likely that the dispatcher is
+ // processing on rollback.
+ sendMessage(_session, _queue, 50);
+ _session.commit();
+
}
public void testOrderingAfterRollback() throws Exception
{
- for (int i = 0; i < 10; i++)
+ //Start the session now so we
+ _connection.start();
+
+ for (int i = 0; i < 20; i++)
{
- TextMessage msg = (TextMessage) cons.receive();
- assertEquals("message 1", msg.getText());
- ssn.rollback();
+ Message msg = _consumer.receive();
+ assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
+
+ // Pull additional messages through so we have some reject work to do
+ for (int m=0; m < 5 ; m++)
+ {
+ _consumer.receive();
+ }
+
+ System.err.println("ROT-Rollback");
+ _logger.warn("ROT-Rollback");
+ _session.rollback();
}
}
- @Override public void tearDown() throws Exception
+ public void testOrderingAfterRollbackOnMessage() throws Exception
{
- while (true)
+ final CountDownLatch count= new CountDownLatch(20);
+ final Exception exceptions[] = new Exception[20];
+ final AtomicBoolean failed = new AtomicBoolean(false);
+
+ _consumer.setMessageListener(new MessageListener()
{
- Message msg = cons.receiveNoWait();
- if (msg == null)
+
+ public void onMessage(Message message)
{
- break;
+
+ Message msg = message;
+ try
+ {
+ count.countDown();
+ assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
+
+ _session.rollback();
+ }
+ catch (JMSException e)
+ {
+ exceptions[(int)count.getCount()] = e;
+ }
+ catch (AssertionFailedError cf)
+ {
+ // End Test if Equality test fails
+ while (count.getCount() != 0)
+ {
+ count.countDown();
+ }
+
+ System.err.println(cf.getMessage());
+ cf.printStackTrace();
+ failed.set(true);
+ }
}
- else
+ });
+ //Start the session now so we
+ _connection.start();
+
+ count.await();
+
+ for (Exception e : exceptions)
+ {
+ if (e != null)
{
- msg.acknowledge();
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ failed.set(true);
}
}
- ssn.commit();
+
+ assertFalse("Exceptions thrown during test run, Check Std.err.", failed.get());
+ }
+
+ @Override public void tearDown() throws Exception
+ {
+ drainQueue(_queue);
+
super.tearDown();
}