diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-02-23 10:20:44 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-23 10:20:44 +0000 |
commit | de248153d311b1e0211dfe3230afcb306f3c0192 (patch) | |
tree | 30412df8d5fd1d3ef076fba0903301b25f8a7518 /java/client/src | |
parent | f74e4dc27d1655760d0213fd60cc75c272c26f00 (diff) | |
download | qpid-python-de248153d311b1e0211dfe3230afcb306f3c0192.tar.gz |
QPID-346 Message loss after rollback
QPID-348 Problems of prefetching messages
QPID-355 Closing a consumer does not ensure messages delivery will stop for that subscription
BROKER
AMQChannel - updated requeue to either resend via the Delivery Manager not directly via msg.writedeliver.
BasicRejectMethodHandler - initial place holder.
TxRollbackHandler - Added comment
AMQMessage - added ability to record who has taken the message so that it can be resent to that subscriber on resend/requeue.
AMQQueue - added the queue reference to the Subscription creation
ConcurrentSelectorDeliveryManager - Added methods to correctly monitor the size of queue messages. Including messages on the resend queue of a Subscriber. Additional locking to ensure that messages are not sent to the subscriber after Closure. QPID-355
DeliveryManager - adjusted deliver call to allow delivery to the head of the queue.
Subscription - changes to allow selction of queue(resend or predelivery) methods to add to resend and getSendLock to ensure that sending to the Subscription is allowed.
SubscriptionFactory - changes to allow the AMQQueue to be passed to the Subscription.
SubscriptionImpl - implementation of the interfaces. Local storage of messages to be resent and requeuing of the messages during closure.
SubscriptionSet - changes to retrieve the actual stored Subscription when performing removeSubscriber. So we have access to the the resend queue.
AMQStateManager - Added BasicRejectMethodHandler
TransactionalContext - Added option to deliver the messages to the front of the queue.
LocalTransactionalContext - cleared the _postComitDeliveryList on rollback. Added option to deliver the messages to the front of the queue.
NonTransactionalContext - Added option to deliver the messages to the front of the queue.
DeliverMessageOperation.java DELELTED AS NOT USED.
CLIENT
AMQSession - added ability to get the pervious state of the dispatcher when settting Stopped, fixed the channel suspension problems on broker so uncommented clean up code in rollback and recover.
BasicMessageConsumer - updated the rollback so that it sends reject messages to server.
AbstractJMSMessage - whitespace + added extra message properties to the toString()
AMQProtocolHandler - whitespace + extra debug output
TransactedTest - updated expect to prevent NPEs also added extra logging to help understand what is going on.
CLUSTER
ClusteredQueue - AMQQueue changes for message deliveryFirst.
RemoteSubscriptionImpl - Implementation of Subscription
SYSTESTS
AbstractHeadersExchangeTestBase - AMQQueue changes for message deliveryFirst.
AMQQueueMBeanTest - changes for message deliveryFirst.
ConcurrencyTest - changes for message deliveryFirst.
DeliveryManagerTest - changes for message deliveryFirst.
SubscriptionTestHelper - Implementation of Subscription
WhiteSpace only
UnacknowledgedMessageMapImpl.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@510897 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
6 files changed, 468 insertions, 88 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index ad600ddb40..89f596e541 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -255,13 +255,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return _connectionStopped; } - void setConnectionStopped(boolean connectionStopped) + boolean setConnectionStopped(boolean connectionStopped) { + boolean currently; synchronized (_lock) { + currently = _connectionStopped; _connectionStopped = connectionStopped; _lock.notify(); } + return currently; } private void dispatchMessage(UnprocessedMessage message) @@ -543,7 +546,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (!isSuspended) { -// suspendChannel(true); + suspendChannel(true); } _connection.getProtocolHandler().syncWrite( @@ -556,7 +559,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (!isSuspended) { -// suspendChannel(false); + suspendChannel(false); } } catch (AMQException e) @@ -822,10 +825,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean isSuspended = isSuspended(); -// if (!isSuspended) -// { -// suspendChannel(true); -// } + if (!isSuspended) + { + suspendChannel(true); + } for (BasicMessageConsumer consumer : _consumers.values()) { @@ -841,15 +844,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false) // requeue , BasicRecoverOkBody.class); -// if (_dispatcher != null) -// { -// _dispatcher.rollback(); -// } -// -// if (!isSuspended) -// { -// suspendChannel(false); -// } + if (_dispatcher != null) + { + _dispatcher.rollback(); + } + + if (!isSuspended) + { + suspendChannel(false); + } } catch (AMQException e) { @@ -1952,7 +1955,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_dispatcher == null) { rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); - } + }// if the dispatcher is running we have to do the clean up in the Ok Handler. } } @@ -2171,8 +2174,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi rejectMessagesForConsumerTag(null, requeue); } - /** @param consumerTag The consumerTag to prune from queue or all if null - * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) + /** + * @param consumerTag The consumerTag to prune from queue or all if null + * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) */ private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue) @@ -2192,7 +2196,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi messages.remove(); -// rejectMessage(message.getDeliverBody().deliveryTag, requeue); + rejectMessage(message.getDeliverBody().deliveryTag, requeue); _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 496e377435..e9b914425a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -745,28 +745,32 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _logger.debug("Rejecting the messages for consumer with tag:" + _consumerTag); } - for (Object o : _synchronousQueue) + Iterator iterator = _synchronousQueue.iterator(); + while (iterator.hasNext()) { + Object o = iterator.next(); + if (o instanceof AbstractJMSMessage) { -// _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true); + _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true); if (_logger.isTraceEnabled()) { _logger.trace("Rejected message" + o); + iterator.remove(); } } else { _logger.error("Queue contained a :" + o.getClass() + - " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); } } if (_synchronousQueue.size() != 0) { - _logger.warn("Queue was not empty after rejecting all messages"); + _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size()); } _synchronousQueue.clear(); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 67d74055c6..36dd4d400c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -87,17 +87,17 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach switch (contentType) { - case AMQDestination.QUEUE_TYPE: - dest = new AMQQueue(exchange, routingKey, routingKey); - break; + case AMQDestination.QUEUE_TYPE: + dest = new AMQQueue(exchange, routingKey, routingKey); + break; - case AMQDestination.TOPIC_TYPE: - dest = new AMQTopic(exchange, routingKey, null); - break; + case AMQDestination.TOPIC_TYPE: + dest = new AMQTopic(exchange, routingKey, null); + break; - default: - dest = new AMQUndefinedDestination(exchange, routingKey, null); - break; + default: + dest = new AMQUndefinedDestination(exchange, routingKey, null); + break; } //Destination dest = AMQDestination.createDestination(url); setJMSDestination(dest); @@ -203,7 +203,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach if (!(destination instanceof AMQDestination)) { throw new IllegalArgumentException( - "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); + "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); } final AMQDestination amqd = (AMQDestination) destination; @@ -495,8 +495,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public abstract void clearBodyImpl() throws JMSException; /** - * Get a String representation of the body of the message. Used in the - * toString() method which outputs this before message properties. + * Get a String representation of the body of the message. Used in the toString() method which outputs this before + * message properties. */ public abstract String toBodyString() throws JMSException; @@ -519,7 +519,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach buf.append("\nJMS priority: ").append(getJMSPriority()); buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode()); buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo())); + buf.append("\nJMS Redelivered: ").append(_redelivered); + buf.append("\nJMS Destination: ").append(getJMSDestination()); + buf.append("\nJMS Type: ").append(getJMSType()); + buf.append("\nJMS MessageID: ").append(getJMSMessageID()); buf.append("\nAMQ message number: ").append(_deliveryTag); + buf.append("\nProperties:"); if (getJmsHeaders().isEmpty()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 988a12ee78..d0cc52271a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -65,15 +65,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class); /** - * The connection that this protocol handler is associated with. There is a 1-1 - * mapping between connection instances and protocol handler instances. + * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances + * and protocol handler instances. */ private AMQConnection _connection; - /** - * Our wrapper for a protocol session that provides access to session values - * in a typesafe manner. - */ + /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */ private volatile AMQProtocolSession _protocolSession; private AMQStateManager _stateManager = new AMQStateManager(); @@ -120,8 +117,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter // we only add the SSL filter where we have an SSL connection if (_connection.getSSLConfiguration() != null) { - SSLConfiguration sslConfig = _connection.getSSLConfiguration(); - SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); + SSLConfiguration sslConfig = _connection.getSSLConfiguration(); + SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext()); sslFilter.setUseClientMode(true); session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); @@ -139,7 +136,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter { e.printStackTrace(); } - + _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); _protocolSession.init(); } @@ -154,6 +151,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter * sessionClosed() depending on whether we were trying to send data at the time of failure. * * @param session + * * @throws Exception */ public void sessionClosed(IoSession session) throws Exception @@ -208,9 +206,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter _logger.info("Protocol Session [" + this + "] closed"); } - /** - * See {@link FailoverHandler} to see rationale for separate thread. - */ + /** See {@link FailoverHandler} to see rationale for separate thread. */ private void startFailoverThread() { Thread failoverThread = new Thread(_failoverHandler); @@ -267,10 +263,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** - * There are two cases where we have other threads potentially blocking for events to be handled by this - * class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a - * particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can - * react appropriately. + * There are two cases where we have other threads potentially blocking for events to be handled by this class. + * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type + * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately. * * @param e the exception to propagate */ @@ -306,13 +301,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - switch(bodyFrame.getFrameType()) + switch (bodyFrame.getFrameType()) { case AMQMethodBody.TYPE: if (debug) { - _logger.debug("Method frame received: " + frame); + _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); } final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame); @@ -362,10 +357,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame); break; - + case HeartbeatBody.TYPE: - if(debug) + if (debug) { _logger.debug("Received heartbeat"); } @@ -413,8 +408,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** - * Convenience method that writes a frame to the protocol session. Equivalent - * to calling getProtocolSession().write(). + * Convenience method that writes a frame to the protocol session. Equivalent to calling + * getProtocolSession().write(). * * @param frame the frame to write */ @@ -429,30 +424,28 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** - * Convenience method that writes a frame to the protocol session and waits for - * a particular response. Equivalent to calling getProtocolSession().write() then - * waiting for the response. + * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to + * calling getProtocolSession().write() then waiting for the response. * * @param frame * @param listener the blocking listener. Note the calling thread will block. */ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, - BlockingMethodFrameListener listener) + BlockingMethodFrameListener listener) throws AMQException { return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT); } /** - * Convenience method that writes a frame to the protocol session and waits for - * a particular response. Equivalent to calling getProtocolSession().write() then - * waiting for the response. + * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to + * calling getProtocolSession().write() then waiting for the response. * * @param frame * @param listener the blocking listener. Note the calling thread will block. */ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, - BlockingMethodFrameListener listener, long timeout) + BlockingMethodFrameListener listener, long timeout) throws AMQException { try @@ -477,17 +470,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter } - /** - * More convenient method to write a frame and wait for it's response. - */ + /** More convenient method to write a frame and wait for it's response. */ public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException { return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT); } - /** - * More convenient method to write a frame and wait for it's response. - */ + /** More convenient method to write a frame and wait for it's response. */ public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException { return writeCommandFrameAndWaitForReply(frame, @@ -495,9 +484,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** - * Convenience method to register an AMQSession with the protocol handler. Registering - * a session with the protocol handler will ensure that messages are delivered to the - * consumer(s) on that session. + * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol + * handler will ensure that messages are delivered to the consumer(s) on that session. * * @param channelId the channel id of the session * @param session the session instance. @@ -555,17 +543,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter } - /** - * @return the number of bytes read from this protocol session - */ + /** @return the number of bytes read from this protocol session */ public long getReadBytes() { return _protocolSession.getIoSession().getReadBytes(); } - /** - * @return the number of bytes written to this protocol session - */ + /** @return the number of bytes written to this protocol session */ public long getWrittenBytes() { return _protocolSession.getIoSession().getWrittenBytes(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java new file mode 100644 index 0000000000..0d75a6b968 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.transacted; + +import junit.framework.TestCase; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.AMQException; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.log4j.Logger; + +import javax.jms.Session; +import javax.jms.MessageProducer; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.TextMessage; + +/** + * This class tests a number of commits and roll back scenarios + * + * Assumptions; - Assumes empty Queue + */ +public class CommitRollbackTest extends TestCase +{ + protected AMQConnection conn; + protected final String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue"; + protected String payload = "xyzzy"; + private Session _session; + private MessageProducer _publisher; + private Session _pubSession; + private MessageConsumer _consumer; + Queue _jmsQueue; + + private static final Logger _logger = Logger.getLogger(CommitRollbackTest.class); + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + newConnection(); + } + + private void newConnection() throws AMQException, URLSyntaxException, JMSException + { + conn = new AMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'"); + + _session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); + + _jmsQueue = _session.createQueue(queue); + _consumer = _session.createConsumer(_jmsQueue); + + _pubSession = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); + + _publisher = _pubSession.createProducer(_pubSession.createQueue(queue)); + + conn.start(); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + + conn.close(); + TransportConnection.killVMBroker(1); + } + + /** PUT a text message, disconnect before commit, confirm it is gone. */ + public void testPutThenDisconnect() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testPutThenDisconnect"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _logger.info("reconnecting without commit"); + conn.close(); + + newConnection(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + + //commit to ensure message is removed from queue + _session.commit(); + + assertNull("test message was put and disconnected before commit, but is still present", result); + } + + /** PUT a text message, disconnect before commit, confirm it is gone. */ + public void testPutThenCloseDisconnect() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testPutThenDisconnect"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _logger.info("closing publisher without commit"); + _publisher.close(); + + _logger.info("reconnecting without commit"); + conn.close(); + + newConnection(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + + //commit to ensure message is removed from queue + _session.commit(); + + assertNull("test message was put and disconnected before commit, but is still present", result); + } + + /** + * PUT a text message, rollback, confirm message is gone. The consumer is on the same connection but different + * session as producer + */ + public void testPutThenRollback() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testPutThenRollback"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _logger.info("rolling back"); + _pubSession.rollback(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + + assertNull("test message was put and rolled back, but is still present", result); + } + + /** GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection */ + public void testGetThenDisconnect() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testGetThenDisconnect"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + _logger.info("getting test message"); + + Message msg = _consumer.receive(1000); + assertNotNull("retrieved message is null", msg); + + _logger.info("closing connection"); + conn.close(); + + newConnection(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + + _session.commit(); + + assertNotNull("test message was consumed and disconnected before commit, but is gone", result); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText()); + } + + /** + * GET a text message, close consumer, disconnect before commit, confirm it is still there. The consumer is on the + * same connection but different session as producer + */ + public void testGetThenCloseDisconnect() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testGetThenCloseDisconnect"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + _logger.info("getting test message"); + + Message msg = _consumer.receive(1000); + assertNotNull("retrieved message is null", msg); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText()); + + _logger.info("reconnecting without commit"); + _consumer.close(); + conn.close(); + + newConnection(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + + _session.commit(); + + assertNotNull("test message was consumed and disconnected before commit, but is gone", result); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText()); + } + + /** + * GET a text message, rollback, confirm it is still there. The consumer is on the same connection but differnt + * session to the producer + */ + public void testGetThenRollback() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testGetThenDisconnect"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + _logger.info("getting test message"); + + Message msg = _consumer.receive(1000); + + assertNotNull("retrieved message is null", msg); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText()); + + _logger.info("rolling back"); + + _session.rollback(); + + _logger.info("receiving result"); + + Message result = _consumer.receive(1000); + + _session.commit(); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + } + + /** + * GET a text message, close message producer, rollback, confirm it is still there. The consumer is on the same + * connection but different session as producer + */ + public void testGetThenCloseRollback() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testGetThenCloseRollback"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + _logger.info("getting test message"); + + Message msg = _consumer.receive(1000); + + assertNotNull("retrieved message is null", msg); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText()); + + _logger.info("Closing consumer"); + _consumer.close(); + + _logger.info("rolling back"); + _session.rollback(); + + _logger.info("receiving result"); + + _consumer = _session.createConsumer(_jmsQueue); + + Message result = _consumer.receive(1000); + + _session.commit(); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + } + + + /** Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order */ + public void testSend2ThenRollback() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending two test messages"); + _publisher.send(_pubSession.createTextMessage("1")); + _publisher.send(_pubSession.createTextMessage("2")); + _pubSession.commit(); + + _logger.info("getting test message"); + assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText()); + + _logger.info("rolling back"); + _session.rollback(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("1", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + + result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("2", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + + result = _consumer.receive(1000); + + assertNull("test message should be null", result); + } + + public void testSend2ThenCloseAfter1andTryAgain() throws Exception + { +// assertTrue("session is not transacted", _session.getTransacted()); +// assertTrue("session is not transacted", _pubSession.getTransacted()); +// +// _logger.info("sending two test messages"); +// _publisher.send(_pubSession.createTextMessage("1")); +// _publisher.send(_pubSession.createTextMessage("2")); +// _pubSession.commit(); +// +// _logger.info("getting test message"); +// assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText()); +// +// _consumer.close(); +// +// _consumer = _session.createConsumer(_jmsQueue); +// +// _logger.info("receiving result"); +// Message result = _consumer.receive(1000); +// _logger.error("1:" + result); +//// assertNotNull("test message was consumed and rolled back, but is gone", result); +//// assertEquals("1" , ((TextMessage) result).getText()); +//// assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); +// +// result = _consumer.receive(1000); +// _logger.error("2" + result); +//// assertNotNull("test message was consumed and rolled back, but is gone", result); +//// assertEquals("2", ((TextMessage) result).getText()); +//// assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); +// +// result = _consumer.receive(1000); +// _logger.error("3" + result); +// assertNull("test message should be null:" + result, result); + } + +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java index 4296e43f88..94cbb426e5 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -63,12 +63,11 @@ public class TransactedTest extends TestCase super.setUp(); TransportConnection.createVMBroker(1); con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test"); - session = con.createSession(true, 0); + session = con.createSession(true, Session.SESSION_TRANSACTED); queue1 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false); - consumer1 = session.createConsumer(queue1); //Dummy just to create the queue. MessageConsumer consumer2 = session.createConsumer(queue2); @@ -81,7 +80,6 @@ public class TransactedTest extends TestCase prepProducer1 = prepSession.createProducer(queue1); prepCon.start(); - //add some messages prepProducer1.send(prepSession.createTextMessage("A")); prepProducer1.send(prepSession.createTextMessage("B")); @@ -127,24 +125,33 @@ public class TransactedTest extends TestCase public void testRollback() throws Exception { + _logger.info("Sending X Y Z"); producer2.send(session.createTextMessage("X")); producer2.send(session.createTextMessage("Y")); producer2.send(session.createTextMessage("Z")); + _logger.info("Receiving A B"); expect("A", consumer1.receive(1000)); expect("B", consumer1.receive(1000)); - expect("C", consumer1.receive(1000)); + //Don't consume 'C' leave it in the prefetch cache to ensure rollback removes it. //rollback + _logger.info("rollback"); session.rollback(); + _logger.info("Receiving A B C"); //ensure sent messages are not visible and received messages are requeued expect("A", consumer1.receive(1000)); expect("B", consumer1.receive(1000)); expect("C", consumer1.receive(1000)); + + _logger.info("Starting new connection"); testCon.start(); testConsumer1 = testSession.createConsumer(queue1); + _logger.info("Testing we have no messages left"); assertTrue(null == testConsumer1.receive(1000)); assertTrue(null == testConsumer2.receive(1000)); + + session.commit(); } public void testResendsMsgsAfterSessionClose() throws Exception @@ -152,7 +159,7 @@ public class TransactedTest extends TestCase AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE); - AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(),new AMQShortString("Q3"), false); + AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false); MessageConsumer consumer = consumerSession.createConsumer(queue3); //force synch to ensure the consumer has resulted in a bound queue ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); @@ -225,8 +232,9 @@ public class TransactedTest extends TestCase private void expect(String text, Message msg) throws JMSException { - assertTrue(msg instanceof TextMessage); - assertEquals(text, ((TextMessage) msg).getText()); + assertNotNull("Message should not be null", msg); + assertTrue("Message should be a text message", msg instanceof TextMessage); + assertEquals("Message content does not match expected", text, ((TextMessage) msg).getText()); } public static junit.framework.Test suite() |