summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-23 10:20:44 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-23 10:20:44 +0000
commitde248153d311b1e0211dfe3230afcb306f3c0192 (patch)
tree30412df8d5fd1d3ef076fba0903301b25f8a7518 /java/client/src
parentf74e4dc27d1655760d0213fd60cc75c272c26f00 (diff)
downloadqpid-python-de248153d311b1e0211dfe3230afcb306f3c0192.tar.gz
QPID-346 Message loss after rollback
QPID-348 Problems of prefetching messages QPID-355 Closing a consumer does not ensure messages delivery will stop for that subscription BROKER AMQChannel - updated requeue to either resend via the Delivery Manager not directly via msg.writedeliver. BasicRejectMethodHandler - initial place holder. TxRollbackHandler - Added comment AMQMessage - added ability to record who has taken the message so that it can be resent to that subscriber on resend/requeue. AMQQueue - added the queue reference to the Subscription creation ConcurrentSelectorDeliveryManager - Added methods to correctly monitor the size of queue messages. Including messages on the resend queue of a Subscriber. Additional locking to ensure that messages are not sent to the subscriber after Closure. QPID-355 DeliveryManager - adjusted deliver call to allow delivery to the head of the queue. Subscription - changes to allow selction of queue(resend or predelivery) methods to add to resend and getSendLock to ensure that sending to the Subscription is allowed. SubscriptionFactory - changes to allow the AMQQueue to be passed to the Subscription. SubscriptionImpl - implementation of the interfaces. Local storage of messages to be resent and requeuing of the messages during closure. SubscriptionSet - changes to retrieve the actual stored Subscription when performing removeSubscriber. So we have access to the the resend queue. AMQStateManager - Added BasicRejectMethodHandler TransactionalContext - Added option to deliver the messages to the front of the queue. LocalTransactionalContext - cleared the _postComitDeliveryList on rollback. Added option to deliver the messages to the front of the queue. NonTransactionalContext - Added option to deliver the messages to the front of the queue. DeliverMessageOperation.java DELELTED AS NOT USED. CLIENT AMQSession - added ability to get the pervious state of the dispatcher when settting Stopped, fixed the channel suspension problems on broker so uncommented clean up code in rollback and recover. BasicMessageConsumer - updated the rollback so that it sends reject messages to server. AbstractJMSMessage - whitespace + added extra message properties to the toString() AMQProtocolHandler - whitespace + extra debug output TransactedTest - updated expect to prevent NPEs also added extra logging to help understand what is going on. CLUSTER ClusteredQueue - AMQQueue changes for message deliveryFirst. RemoteSubscriptionImpl - Implementation of Subscription SYSTESTS AbstractHeadersExchangeTestBase - AMQQueue changes for message deliveryFirst. AMQQueueMBeanTest - changes for message deliveryFirst. ConcurrencyTest - changes for message deliveryFirst. DeliveryManagerTest - changes for message deliveryFirst. SubscriptionTestHelper - Implementation of Subscription WhiteSpace only UnacknowledgedMessageMapImpl.java git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@510897 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java44
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java29
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java74
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java375
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java22
6 files changed, 468 insertions, 88 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index ad600ddb40..89f596e541 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -255,13 +255,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
return _connectionStopped;
}
- void setConnectionStopped(boolean connectionStopped)
+ boolean setConnectionStopped(boolean connectionStopped)
{
+ boolean currently;
synchronized (_lock)
{
+ currently = _connectionStopped;
_connectionStopped = connectionStopped;
_lock.notify();
}
+ return currently;
}
private void dispatchMessage(UnprocessedMessage message)
@@ -543,7 +546,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (!isSuspended)
{
-// suspendChannel(true);
+ suspendChannel(true);
}
_connection.getProtocolHandler().syncWrite(
@@ -556,7 +559,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (!isSuspended)
{
-// suspendChannel(false);
+ suspendChannel(false);
}
}
catch (AMQException e)
@@ -822,10 +825,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
boolean isSuspended = isSuspended();
-// if (!isSuspended)
-// {
-// suspendChannel(true);
-// }
+ if (!isSuspended)
+ {
+ suspendChannel(true);
+ }
for (BasicMessageConsumer consumer : _consumers.values())
{
@@ -841,15 +844,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false) // requeue
, BasicRecoverOkBody.class);
-// if (_dispatcher != null)
-// {
-// _dispatcher.rollback();
-// }
-//
-// if (!isSuspended)
-// {
-// suspendChannel(false);
-// }
+ if (_dispatcher != null)
+ {
+ _dispatcher.rollback();
+ }
+
+ if (!isSuspended)
+ {
+ suspendChannel(false);
+ }
}
catch (AMQException e)
{
@@ -1952,7 +1955,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (_dispatcher == null)
{
rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
- }
+ }// if the dispatcher is running we have to do the clean up in the Ok Handler.
}
}
@@ -2171,8 +2174,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
rejectMessagesForConsumerTag(null, requeue);
}
- /** @param consumerTag The consumerTag to prune from queue or all if null
- * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ)
+ /**
+ * @param consumerTag The consumerTag to prune from queue or all if null
+ * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ)
*/
private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue)
@@ -2192,7 +2196,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
messages.remove();
-// rejectMessage(message.getDeliverBody().deliveryTag, requeue);
+ rejectMessage(message.getDeliverBody().deliveryTag, requeue);
_logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 496e377435..e9b914425a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -745,28 +745,32 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_logger.debug("Rejecting the messages for consumer with tag:" + _consumerTag);
}
- for (Object o : _synchronousQueue)
+ Iterator iterator = _synchronousQueue.iterator();
+ while (iterator.hasNext())
{
+ Object o = iterator.next();
+
if (o instanceof AbstractJMSMessage)
{
-// _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true);
+ _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true);
if (_logger.isTraceEnabled())
{
_logger.trace("Rejected message" + o);
+ iterator.remove();
}
}
else
{
_logger.error("Queue contained a :" + o.getClass() +
- " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+ " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
}
}
if (_synchronousQueue.size() != 0)
{
- _logger.warn("Queue was not empty after rejecting all messages");
+ _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size());
}
_synchronousQueue.clear();
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 67d74055c6..36dd4d400c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -87,17 +87,17 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
switch (contentType)
{
- case AMQDestination.QUEUE_TYPE:
- dest = new AMQQueue(exchange, routingKey, routingKey);
- break;
+ case AMQDestination.QUEUE_TYPE:
+ dest = new AMQQueue(exchange, routingKey, routingKey);
+ break;
- case AMQDestination.TOPIC_TYPE:
- dest = new AMQTopic(exchange, routingKey, null);
- break;
+ case AMQDestination.TOPIC_TYPE:
+ dest = new AMQTopic(exchange, routingKey, null);
+ break;
- default:
- dest = new AMQUndefinedDestination(exchange, routingKey, null);
- break;
+ default:
+ dest = new AMQUndefinedDestination(exchange, routingKey, null);
+ break;
}
//Destination dest = AMQDestination.createDestination(url);
setJMSDestination(dest);
@@ -203,7 +203,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
if (!(destination instanceof AMQDestination))
{
throw new IllegalArgumentException(
- "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
+ "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
}
final AMQDestination amqd = (AMQDestination) destination;
@@ -495,8 +495,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public abstract void clearBodyImpl() throws JMSException;
/**
- * Get a String representation of the body of the message. Used in the
- * toString() method which outputs this before message properties.
+ * Get a String representation of the body of the message. Used in the toString() method which outputs this before
+ * message properties.
*/
public abstract String toBodyString() throws JMSException;
@@ -519,7 +519,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
buf.append("\nJMS priority: ").append(getJMSPriority());
buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode());
buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo()));
+ buf.append("\nJMS Redelivered: ").append(_redelivered);
+ buf.append("\nJMS Destination: ").append(getJMSDestination());
+ buf.append("\nJMS Type: ").append(getJMSType());
+ buf.append("\nJMS MessageID: ").append(getJMSMessageID());
buf.append("\nAMQ message number: ").append(_deliveryTag);
+
buf.append("\nProperties:");
if (getJmsHeaders().isEmpty())
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 988a12ee78..d0cc52271a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -65,15 +65,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter
private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
/**
- * The connection that this protocol handler is associated with. There is a 1-1
- * mapping between connection instances and protocol handler instances.
+ * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances
+ * and protocol handler instances.
*/
private AMQConnection _connection;
- /**
- * Our wrapper for a protocol session that provides access to session values
- * in a typesafe manner.
- */
+ /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */
private volatile AMQProtocolSession _protocolSession;
private AMQStateManager _stateManager = new AMQStateManager();
@@ -120,8 +117,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
// we only add the SSL filter where we have an SSL connection
if (_connection.getSSLConfiguration() != null)
{
- SSLConfiguration sslConfig = _connection.getSSLConfiguration();
- SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
+ SSLConfiguration sslConfig = _connection.getSSLConfiguration();
+ SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext());
sslFilter.setUseClientMode(true);
session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
@@ -139,7 +136,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
e.printStackTrace();
}
-
+
_protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
_protocolSession.init();
}
@@ -154,6 +151,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* sessionClosed() depending on whether we were trying to send data at the time of failure.
*
* @param session
+ *
* @throws Exception
*/
public void sessionClosed(IoSession session) throws Exception
@@ -208,9 +206,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_logger.info("Protocol Session [" + this + "] closed");
}
- /**
- * See {@link FailoverHandler} to see rationale for separate thread.
- */
+ /** See {@link FailoverHandler} to see rationale for separate thread. */
private void startFailoverThread()
{
Thread failoverThread = new Thread(_failoverHandler);
@@ -267,10 +263,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
/**
- * There are two cases where we have other threads potentially blocking for events to be handled by this
- * class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a
- * particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can
- * react appropriately.
+ * There are two cases where we have other threads potentially blocking for events to be handled by this class.
+ * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type
+ * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately.
*
* @param e the exception to propagate
*/
@@ -306,13 +301,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
- switch(bodyFrame.getFrameType())
+ switch (bodyFrame.getFrameType())
{
case AMQMethodBody.TYPE:
if (debug)
{
- _logger.debug("Method frame received: " + frame);
+ _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
}
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
@@ -362,10 +357,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_protocolSession.messageContentBodyReceived(frame.getChannel(),
(ContentBody) bodyFrame);
break;
-
+
case HeartbeatBody.TYPE:
- if(debug)
+ if (debug)
{
_logger.debug("Received heartbeat");
}
@@ -413,8 +408,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
/**
- * Convenience method that writes a frame to the protocol session. Equivalent
- * to calling getProtocolSession().write().
+ * Convenience method that writes a frame to the protocol session. Equivalent to calling
+ * getProtocolSession().write().
*
* @param frame the frame to write
*/
@@ -429,30 +424,28 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
/**
- * Convenience method that writes a frame to the protocol session and waits for
- * a particular response. Equivalent to calling getProtocolSession().write() then
- * waiting for the response.
+ * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
+ * calling getProtocolSession().write() then waiting for the response.
*
* @param frame
* @param listener the blocking listener. Note the calling thread will block.
*/
public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
- BlockingMethodFrameListener listener)
+ BlockingMethodFrameListener listener)
throws AMQException
{
return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT);
}
/**
- * Convenience method that writes a frame to the protocol session and waits for
- * a particular response. Equivalent to calling getProtocolSession().write() then
- * waiting for the response.
+ * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
+ * calling getProtocolSession().write() then waiting for the response.
*
* @param frame
* @param listener the blocking listener. Note the calling thread will block.
*/
public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
- BlockingMethodFrameListener listener, long timeout)
+ BlockingMethodFrameListener listener, long timeout)
throws AMQException
{
try
@@ -477,17 +470,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
- /**
- * More convenient method to write a frame and wait for it's response.
- */
+ /** More convenient method to write a frame and wait for it's response. */
public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException
{
return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT);
}
- /**
- * More convenient method to write a frame and wait for it's response.
- */
+ /** More convenient method to write a frame and wait for it's response. */
public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException
{
return writeCommandFrameAndWaitForReply(frame,
@@ -495,9 +484,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
/**
- * Convenience method to register an AMQSession with the protocol handler. Registering
- * a session with the protocol handler will ensure that messages are delivered to the
- * consumer(s) on that session.
+ * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol
+ * handler will ensure that messages are delivered to the consumer(s) on that session.
*
* @param channelId the channel id of the session
* @param session the session instance.
@@ -555,17 +543,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
- /**
- * @return the number of bytes read from this protocol session
- */
+ /** @return the number of bytes read from this protocol session */
public long getReadBytes()
{
return _protocolSession.getIoSession().getReadBytes();
}
- /**
- * @return the number of bytes written to this protocol session
- */
+ /** @return the number of bytes written to this protocol session */
public long getWrittenBytes()
{
return _protocolSession.getIoSession().getWrittenBytes();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
new file mode 100644
index 0000000000..0d75a6b968
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.transacted;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Logger;
+
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+
+/**
+ * This class tests a number of commits and roll back scenarios
+ *
+ * Assumptions; - Assumes empty Queue
+ */
+public class CommitRollbackTest extends TestCase
+{
+ protected AMQConnection conn;
+ protected final String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue";
+ protected String payload = "xyzzy";
+ private Session _session;
+ private MessageProducer _publisher;
+ private Session _pubSession;
+ private MessageConsumer _consumer;
+ Queue _jmsQueue;
+
+ private static final Logger _logger = Logger.getLogger(CommitRollbackTest.class);
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ newConnection();
+ }
+
+ private void newConnection() throws AMQException, URLSyntaxException, JMSException
+ {
+ conn = new AMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'");
+
+ _session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+
+ _jmsQueue = _session.createQueue(queue);
+ _consumer = _session.createConsumer(_jmsQueue);
+
+ _pubSession = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+
+ _publisher = _pubSession.createProducer(_pubSession.createQueue(queue));
+
+ conn.start();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ conn.close();
+ TransportConnection.killVMBroker(1);
+ }
+
+ /** PUT a text message, disconnect before commit, confirm it is gone. */
+ public void testPutThenDisconnect() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+
+ _logger.info("sending test message");
+ String MESSAGE_TEXT = "testPutThenDisconnect";
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+
+ _logger.info("reconnecting without commit");
+ conn.close();
+
+ newConnection();
+
+ _logger.info("receiving result");
+ Message result = _consumer.receive(1000);
+
+ //commit to ensure message is removed from queue
+ _session.commit();
+
+ assertNull("test message was put and disconnected before commit, but is still present", result);
+ }
+
+ /** PUT a text message, disconnect before commit, confirm it is gone. */
+ public void testPutThenCloseDisconnect() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+
+ _logger.info("sending test message");
+ String MESSAGE_TEXT = "testPutThenDisconnect";
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+
+ _logger.info("closing publisher without commit");
+ _publisher.close();
+
+ _logger.info("reconnecting without commit");
+ conn.close();
+
+ newConnection();
+
+ _logger.info("receiving result");
+ Message result = _consumer.receive(1000);
+
+ //commit to ensure message is removed from queue
+ _session.commit();
+
+ assertNull("test message was put and disconnected before commit, but is still present", result);
+ }
+
+ /**
+ * PUT a text message, rollback, confirm message is gone. The consumer is on the same connection but different
+ * session as producer
+ */
+ public void testPutThenRollback() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+
+ _logger.info("sending test message");
+ String MESSAGE_TEXT = "testPutThenRollback";
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+
+ _logger.info("rolling back");
+ _pubSession.rollback();
+
+ _logger.info("receiving result");
+ Message result = _consumer.receive(1000);
+
+ assertNull("test message was put and rolled back, but is still present", result);
+ }
+
+ /** GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection */
+ public void testGetThenDisconnect() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ _logger.info("sending test message");
+ String MESSAGE_TEXT = "testGetThenDisconnect";
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+
+ _pubSession.commit();
+
+ _logger.info("getting test message");
+
+ Message msg = _consumer.receive(1000);
+ assertNotNull("retrieved message is null", msg);
+
+ _logger.info("closing connection");
+ conn.close();
+
+ newConnection();
+
+ _logger.info("receiving result");
+ Message result = _consumer.receive(1000);
+
+ _session.commit();
+
+ assertNotNull("test message was consumed and disconnected before commit, but is gone", result);
+ assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
+ }
+
+ /**
+ * GET a text message, close consumer, disconnect before commit, confirm it is still there. The consumer is on the
+ * same connection but different session as producer
+ */
+ public void testGetThenCloseDisconnect() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ _logger.info("sending test message");
+ String MESSAGE_TEXT = "testGetThenCloseDisconnect";
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+
+ _pubSession.commit();
+
+ _logger.info("getting test message");
+
+ Message msg = _consumer.receive(1000);
+ assertNotNull("retrieved message is null", msg);
+ assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText());
+
+ _logger.info("reconnecting without commit");
+ _consumer.close();
+ conn.close();
+
+ newConnection();
+
+ _logger.info("receiving result");
+ Message result = _consumer.receive(1000);
+
+ _session.commit();
+
+ assertNotNull("test message was consumed and disconnected before commit, but is gone", result);
+ assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
+ }
+
+ /**
+ * GET a text message, rollback, confirm it is still there. The consumer is on the same connection but differnt
+ * session to the producer
+ */
+ public void testGetThenRollback() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ _logger.info("sending test message");
+ String MESSAGE_TEXT = "testGetThenDisconnect";
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+
+ _pubSession.commit();
+
+ _logger.info("getting test message");
+
+ Message msg = _consumer.receive(1000);
+
+ assertNotNull("retrieved message is null", msg);
+ assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText());
+
+ _logger.info("rolling back");
+
+ _session.rollback();
+
+ _logger.info("receiving result");
+
+ Message result = _consumer.receive(1000);
+
+ _session.commit();
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
+ assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
+ }
+
+ /**
+ * GET a text message, close message producer, rollback, confirm it is still there. The consumer is on the same
+ * connection but different session as producer
+ */
+ public void testGetThenCloseRollback() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ _logger.info("sending test message");
+ String MESSAGE_TEXT = "testGetThenCloseRollback";
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+
+ _pubSession.commit();
+
+ _logger.info("getting test message");
+
+ Message msg = _consumer.receive(1000);
+
+ assertNotNull("retrieved message is null", msg);
+ assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText());
+
+ _logger.info("Closing consumer");
+ _consumer.close();
+
+ _logger.info("rolling back");
+ _session.rollback();
+
+ _logger.info("receiving result");
+
+ _consumer = _session.createConsumer(_jmsQueue);
+
+ Message result = _consumer.receive(1000);
+
+ _session.commit();
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
+ assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
+ }
+
+
+ /** Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order */
+ public void testSend2ThenRollback() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ _logger.info("sending two test messages");
+ _publisher.send(_pubSession.createTextMessage("1"));
+ _publisher.send(_pubSession.createTextMessage("2"));
+ _pubSession.commit();
+
+ _logger.info("getting test message");
+ assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText());
+
+ _logger.info("rolling back");
+ _session.rollback();
+
+ _logger.info("receiving result");
+ Message result = _consumer.receive(1000);
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertEquals("1", ((TextMessage) result).getText());
+ assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
+
+ result = _consumer.receive(1000);
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertEquals("2", ((TextMessage) result).getText());
+ assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
+
+ result = _consumer.receive(1000);
+
+ assertNull("test message should be null", result);
+ }
+
+ public void testSend2ThenCloseAfter1andTryAgain() throws Exception
+ {
+// assertTrue("session is not transacted", _session.getTransacted());
+// assertTrue("session is not transacted", _pubSession.getTransacted());
+//
+// _logger.info("sending two test messages");
+// _publisher.send(_pubSession.createTextMessage("1"));
+// _publisher.send(_pubSession.createTextMessage("2"));
+// _pubSession.commit();
+//
+// _logger.info("getting test message");
+// assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText());
+//
+// _consumer.close();
+//
+// _consumer = _session.createConsumer(_jmsQueue);
+//
+// _logger.info("receiving result");
+// Message result = _consumer.receive(1000);
+// _logger.error("1:" + result);
+//// assertNotNull("test message was consumed and rolled back, but is gone", result);
+//// assertEquals("1" , ((TextMessage) result).getText());
+//// assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
+//
+// result = _consumer.receive(1000);
+// _logger.error("2" + result);
+//// assertNotNull("test message was consumed and rolled back, but is gone", result);
+//// assertEquals("2", ((TextMessage) result).getText());
+//// assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered());
+//
+// result = _consumer.receive(1000);
+// _logger.error("3" + result);
+// assertNull("test message should be null:" + result, result);
+ }
+
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
index 4296e43f88..94cbb426e5 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
@@ -63,12 +63,11 @@ public class TransactedTest extends TestCase
super.setUp();
TransportConnection.createVMBroker(1);
con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test");
- session = con.createSession(true, 0);
+ session = con.createSession(true, Session.SESSION_TRANSACTED);
queue1 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false);
-
consumer1 = session.createConsumer(queue1);
//Dummy just to create the queue.
MessageConsumer consumer2 = session.createConsumer(queue2);
@@ -81,7 +80,6 @@ public class TransactedTest extends TestCase
prepProducer1 = prepSession.createProducer(queue1);
prepCon.start();
-
//add some messages
prepProducer1.send(prepSession.createTextMessage("A"));
prepProducer1.send(prepSession.createTextMessage("B"));
@@ -127,24 +125,33 @@ public class TransactedTest extends TestCase
public void testRollback() throws Exception
{
+ _logger.info("Sending X Y Z");
producer2.send(session.createTextMessage("X"));
producer2.send(session.createTextMessage("Y"));
producer2.send(session.createTextMessage("Z"));
+ _logger.info("Receiving A B");
expect("A", consumer1.receive(1000));
expect("B", consumer1.receive(1000));
- expect("C", consumer1.receive(1000));
+ //Don't consume 'C' leave it in the prefetch cache to ensure rollback removes it.
//rollback
+ _logger.info("rollback");
session.rollback();
+ _logger.info("Receiving A B C");
//ensure sent messages are not visible and received messages are requeued
expect("A", consumer1.receive(1000));
expect("B", consumer1.receive(1000));
expect("C", consumer1.receive(1000));
+
+ _logger.info("Starting new connection");
testCon.start();
testConsumer1 = testSession.createConsumer(queue1);
+ _logger.info("Testing we have no messages left");
assertTrue(null == testConsumer1.receive(1000));
assertTrue(null == testConsumer2.receive(1000));
+
+ session.commit();
}
public void testResendsMsgsAfterSessionClose() throws Exception
@@ -152,7 +159,7 @@ public class TransactedTest extends TestCase
AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
- AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(),new AMQShortString("Q3"), false);
+ AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false);
MessageConsumer consumer = consumerSession.createConsumer(queue3);
//force synch to ensure the consumer has resulted in a bound queue
((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
@@ -225,8 +232,9 @@ public class TransactedTest extends TestCase
private void expect(String text, Message msg) throws JMSException
{
- assertTrue(msg instanceof TextMessage);
- assertEquals(text, ((TextMessage) msg).getText());
+ assertNotNull("Message should not be null", msg);
+ assertTrue("Message should be a text message", msg instanceof TextMessage);
+ assertEquals("Message content does not match expected", text, ((TextMessage) msg).getText());
}
public static junit.framework.Test suite()