diff options
Diffstat (limited to 'java/client/src')
12 files changed, 1029 insertions, 196 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 89f596e541..61143eee69 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -198,9 +198,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private final Object _suspensionLock = new Object(); - /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ + private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class); + private class Dispatcher extends Thread { @@ -212,12 +213,37 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public Dispatcher() { super("Dispatcher-Channel-" + _channelId); + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " created"); + } } public void run() { + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " started"); + } + UnprocessedMessage message; + // Allow disptacher to start stopped + synchronized (_lock) + { + while (connectionStopped()) + { + try + { + _lock.wait(); + } + catch (InterruptedException e) + { + // ignore + } + } + } + try { while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null) @@ -243,10 +269,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } catch (InterruptedException e) { - ; + //ignore + } + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId); } - - _logger.info("Dispatcher thread terminating for channel " + _channelId); } // only call while holding lock @@ -263,6 +291,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi currently = _connectionStopped; _connectionStopped = connectionStopped; _lock.notify(); + + if (_dispatcherLogger.isDebugEnabled()) + { + _dispatcherLogger.debug("Dispatcher Connection " + (connectionStopped ? "Started" : "Stopped") + + ": Currently " + (currently ? "Started" : "Stopped")); + } } return currently; } @@ -275,9 +309,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (consumer == null) { - _logger.warn("Received a message from queue " + message.getDeliverBody().consumerTag + " without a handler - ignoring..."); - _logger.warn("Consumers that exist: " + _consumers); - _logger.warn("Session hashcode: " + System.identityHashCode(this)); + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + + "[" + message.getDeliverBody().deliveryTag + "] from queue " + + message.getDeliverBody().consumerTag + " without a handler - rejecting(requeue)..."); + } + + rejectMessage(message, true); } else { @@ -311,7 +350,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi rejectAllMessages(true); - _logger.debug("Session Pre Dispatch Queue cleared"); + _dispatcherLogger.debug("Session Pre Dispatch Queue cleared"); for (BasicMessageConsumer consumer : _consumers.values()) { @@ -323,20 +362,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } - public void rejectPending(AMQShortString consumerTag) + public void rejectPending(BasicMessageConsumer consumer) { synchronized (_lock) { - boolean stopped = connectionStopped(); + boolean stopped = _dispatcher.connectionStopped(); - _dispatcher.setConnectionStopped(false); - - rejectMessagesForConsumerTag(consumerTag, true); - - if (stopped) + if (!stopped) { - _dispatcher.setConnectionStopped(stopped); + _dispatcher.setConnectionStopped(true); } + + // Reject messages on pre-receive queue + consumer.rollback(); + + // Reject messages on pre-dispatch queue + rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); + + // Remove consumer from map. + deregisterConsumer(consumer); + + _dispatcher.setConnectionStopped(stopped); + } } } @@ -549,14 +596,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi suspendChannel(true); } - _connection.getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); - if (_dispatcher != null) { _dispatcher.rollback(); } + _connection.getProtocolHandler().syncWrite( + TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); + + if (!isSuspended) { suspendChannel(false); @@ -663,14 +711,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi jmse = e; } } - finally + if (jmse != null) { - if (jmse != null) - { - throw jmse; - } + throw jmse; } - } @@ -835,6 +879,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi consumer.clearUnackedMessages(); } + if (_dispatcher != null) + { + _dispatcher.rollback(); + } + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. @@ -844,11 +893,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false) // requeue , BasicRecoverOkBody.class); - if (_dispatcher != null) - { - _dispatcher.rollback(); - } - if (!isSuspended) { suspendChannel(false); @@ -1223,35 +1267,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return (counter != null) && (counter.get() != 0); } - - public void declareExchange(AMQShortString name, AMQShortString type) throws AMQException + public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException { - declareExchange(name, type, getProtocolHandler()); + declareExchange(name, type, getProtocolHandler(), nowait); } - public void declareExchangeSynch(AMQShortString name, AMQShortString type) throws AMQException + private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - false, // nowait - false, // passive - getTicket(), // ticket - type); // type - getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class); + declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); } - private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException - { - declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler); - } - - private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) throws AMQException + private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException { // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, @@ -1261,7 +1287,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, // durable name, // exchange false, // internal - false, // nowait + nowait, // nowait false, // passive getTicket(), // ticket type); // type @@ -1874,15 +1900,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized void startDistpatcherIfNecessary() { + startDistpatcherIfNecessary(false); + } + + synchronized void startDistpatcherIfNecessary(boolean initiallyStopped) + { if (_dispatcher == null) { _dispatcher = new Dispatcher(); _dispatcher.setDaemon(true); + _dispatcher.setConnectionStopped(initiallyStopped); _dispatcher.start(); } else { - _dispatcher.setConnectionStopped(false); + _dispatcher.setConnectionStopped(initiallyStopped); } } @@ -1910,7 +1942,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQProtocolHandler protocolHandler = getProtocolHandler(); - declareExchange(amqd, protocolHandler); + declareExchange(amqd, protocolHandler, false); AMQShortString queueName = declareQueue(amqd, protocolHandler); @@ -1950,12 +1982,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _destinationConsumerCount.remove(dest); } } - - //ensure we remove the messages from the consumer even if the dispatcher hasn't started - if (_dispatcher == null) - { - rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); - }// if the dispatcher is running we have to do the clean up in the Ok Handler. } } @@ -2033,6 +2059,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void confirmConsumerCancelled(AMQShortString consumerTag) { + + // Remove the consumer from the map BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); if (consumer != null) { @@ -2040,26 +2068,33 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { consumer.closeWhenNoMessages(true); } + + //Clean the Maps up first + //Flush any pending messages for this consumerTag + if (_dispatcher != null) + { + _logger.info("Dispatcher is not null"); + } else { - consumer.rollback(); + _logger.info("Dispatcher is null so created stopped dispatcher"); + + startDistpatcherIfNecessary(true); } - } - //Flush any pending messages for this consumerTag - if (_dispatcher != null) - { - _dispatcher.rejectPending(consumerTag); + _dispatcher.rejectPending(consumer); } else { - rejectMessagesForConsumerTag(consumerTag, true); + _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map."); } + + } /* - * I could have combined the last 3 methods, but this way it improves readability - */ + * I could have combined the last 3 methods, but this way it improves readability + */ private AMQTopic checkValidTopic(Topic topic) throws JMSException { if (topic == null) @@ -2189,16 +2224,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (consumerTag == null || message.getDeliverBody().consumerTag.equals(consumerTag)) { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Removing message from _queue:" + message); + _logger.debug("Removing message(" + System.identityHashCode(message) + + ") from _queue DT:" + message.getDeliverBody().deliveryTag); } messages.remove(); - rejectMessage(message.getDeliverBody().deliveryTag, requeue); + rejectMessage(message, requeue); - _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); + } } else { @@ -2207,15 +2246,45 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + + public void rejectMessage(UnprocessedMessage message, boolean requeue) + { + + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag); + } + + rejectMessage(message.getDeliverBody().deliveryTag, requeue); + } + + public void rejectMessage(AbstractJMSMessage message, boolean requeue) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag()); + } + rejectMessage(message.getDeliveryTag(), requeue); + + } + public void rejectMessage(long deliveryTag, boolean requeue) { - AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - deliveryTag, - requeue); + if (_acknowledgeMode == CLIENT_ACKNOWLEDGE || + _acknowledgeMode == SESSION_TRANSACTED) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting delivery tag:" + deliveryTag); + } + AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), + getProtocolMinorVersion(), + deliveryTag, + requeue); - _connection.getProtocolHandler().writeFrame(basicRejectBody); + _connection.getProtocolHandler().writeFrame(basicRejectBody); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index e9b914425a..9043faa80c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; import java.util.Iterator; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -109,9 +110,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */ private int _outstanding; - /** Tag of last message delievered, whoch should be acknowledged on commit in transaction mode. */ - private long _lastDeliveryTag; - /** * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding * number of msgs >= _prefetchHigh and disabled at < _prefetchLow @@ -120,6 +118,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>(); + /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */ + private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>(); + /** * The thread that was used to call receive(). This is important for being able to interrupt that thread if a * receive() is in progress. @@ -432,6 +433,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void close(boolean sendClose) throws JMSException { + if (_logger.isInfoEnabled()) + { + _logger.info("Closing consumer:" + debugIdentity()); + } + synchronized (_connection.getFailoverMutex()) { if (!_closed.getAndSet(true)) @@ -448,6 +454,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class); + + if (_logger.isDebugEnabled()) + { + _logger.debug("CancelOk'd for consumer:" + debugIdentity()); + } + } catch (AMQException e) { @@ -456,11 +468,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - deregisterConsumer(); - _unacknowledgedDeliveryTags.clear(); + //done in BasicCancelOK Handler + //deregisterConsumer(); if (_messageListener != null && _receiving.get()) { - _logger.info("Interrupting thread: " + _receivingThread); + if (_logger.isInfoEnabled()) + { + _logger.info("Interrupting thread: " + _receivingThread); + } _receivingThread.interrupt(); } } @@ -616,7 +631,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } else { - _lastDeliveryTag = msg.getDeliveryTag(); + _receivedDeliveryTags.add(msg.getDeliveryTag()); } break; } @@ -625,10 +640,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** Acknowledge up to last message delivered (if any). Used when commiting. */ void acknowledgeLastDelivered() { - if (_lastDeliveryTag > 0) + if (!_receivedDeliveryTags.isEmpty()) { - _session.acknowledgeMessage(_lastDeliveryTag, true); - _lastDeliveryTag = -1; + long lastDeliveryTag = _receivedDeliveryTags.poll(); + + while (!_receivedDeliveryTags.isEmpty()) + { + lastDeliveryTag = _receivedDeliveryTags.poll(); + } + + _session.acknowledgeMessage(lastDeliveryTag, true); } } @@ -738,43 +759,76 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void rollback() { + clearUnackedMessages(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting received messages"); + } + //rollback received but not committed messages + while (!_receivedDeliveryTags.isEmpty()) + { + Long tag = _receivedDeliveryTags.poll(); + + if (tag != null) + { + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejecting tag from _receivedDTs:" + tag); + } + + _session.rejectMessage(tag, true); + } + } + + //rollback pending messages if (_synchronousQueue.size() > 0) { if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting the messages for consumer with tag:" + _consumerTag); + _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ")" + + "for consumer with tag:" + _consumerTag); } Iterator iterator = _synchronousQueue.iterator(); + while (iterator.hasNext()) { - Object o = iterator.next(); + Object o = iterator.next(); if (o instanceof AbstractJMSMessage) { - _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true); + _session.rejectMessage(((AbstractJMSMessage) o), true); if (_logger.isTraceEnabled()) { - _logger.trace("Rejected message" + o); - iterator.remove(); + _logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag()); } + iterator.remove(); } else { _logger.error("Queue contained a :" + o.getClass() + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + iterator.remove(); } } if (_synchronousQueue.size() != 0) { _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size()); + rollback(); } _synchronousQueue.clear(); } } + + public String debugIdentity() + { + return String.valueOf(_consumerTag); + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java index 9bd0205977..bd8177feb6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java @@ -28,27 +28,29 @@ import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.protocol.AMQMethodEvent; -/** - * @author Apache Software Foundation - */ public class BasicCancelOkMethodHandler implements StateAwareMethodListener { - private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class); - private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler(); + private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class); + private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler(); + + public static BasicCancelOkMethodHandler getInstance() + { + return _instance; + } + + private BasicCancelOkMethodHandler() + { + } - public static BasicCancelOkMethodHandler getInstance() - { - return _instance; - } + public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + { + BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod(); - private BasicCancelOkMethodHandler() - { - } + if (_logger.isInfoEnabled()) + { + _logger.info("New BasicCancelOk method received for consumer:" + body.consumerTag); + } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException - { - _logger.debug("New BasicCancelOk method received"); - BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod(); - protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag); - } + protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index ddf79ec907..b176df87fe 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -30,13 +30,11 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; /** - * This class contains everything needed to process a JMS message. It assembles the - * deliver body, the content header and the content body/ies. - * - * Note that the actual work of creating a JMS message for the client code's use is done - * outside of the MINA dispatcher thread in order to minimise the amount of work done in - * the MINA dispatcher thread. + * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and + * the content body/ies. * + * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher + * thread in order to minimise the amount of work done in the MINA dispatcher thread. */ public class UnprocessedMessage { @@ -47,9 +45,7 @@ public class UnprocessedMessage private final int _channelId; private ContentHeaderBody _contentHeader; - /** - * List of ContentBody instances. Due to fragmentation you don't know how big this will be in general - */ + /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */ private List<ContentBody> _bodies; public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody) @@ -74,9 +70,9 @@ public class UnprocessedMessage { final long payloadSize = body.payload.remaining(); - if(_bodies == null) + if (_bodies == null) { - if(payloadSize == getContentHeader().bodySize) + if (payloadSize == getContentHeader().bodySize) { _bodies = Collections.singletonList(body); } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java index b2940d73ae..8a0b5e7d84 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java @@ -58,6 +58,7 @@ public class StateWaiter implements StateListener { _logger.debug("State " + _state + " not achieved so waiting..."); _monitor.wait(TIME_OUT); + //fixme this won't cause the timeout to exit the loop. need to set _throwable } catch (InterruptedException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index 03e7d399ce..cb4ef01d25 100644 --- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -20,9 +20,6 @@ */ package org.apache.qpid.client.util; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.log4j.Logger; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 338404a431..4667a2b3fa 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -73,7 +73,8 @@ public class RecoverTest extends TestCase Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(),new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + // This is the default now AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -130,7 +131,8 @@ public class RecoverTest extends TestCase Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + // This is the default now AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java index 3431c56783..51bbe7d0e6 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java @@ -109,6 +109,10 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } catch (AMQException e) { + if (_logger.isInfoEnabled()) + { + _logger.info("Exception occured was:" + e.getErrorCode()); + } assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode()); _connection = newConnection(); @@ -315,15 +319,15 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } catch (JMSException e) { - fail("Creating new connection when:"+e.getMessage()); + fail("Creating new connection when:" + e.getMessage()); } catch (AMQException e) { - fail("Creating new connection when:"+e.getMessage()); + fail("Creating new connection when:" + e.getMessage()); } catch (URLSyntaxException e) { - fail("Creating new connection when:"+e.getMessage()); + fail("Creating new connection when:" + e.getMessage()); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java new file mode 100644 index 0000000000..a56bae3d70 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -0,0 +1,603 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.close; + +import junit.framework.TestCase; + +import java.util.concurrent.atomic.AtomicInteger; + + +import javax.jms.ExceptionListener; +import javax.jms.Session; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.MessageProducer; +import javax.jms.Message; +import javax.jms.TextMessage; +import javax.jms.MessageConsumer; + +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; +import org.apache.log4j.Level; + +public class MessageRequeueTest extends TestCase +{ + + private static final Logger _logger = Logger.getLogger(MessageRequeueTest.class); + + protected static AtomicInteger consumerIds = new AtomicInteger(0); + protected final Integer numTestMessages = 150; + + protected final int consumeTimeout = 3000; + + protected final String queue = "direct://amq.direct//queue"; + protected String payload = "Message:"; + + protected final String BROKER = "vm://:1"; + private boolean testReception = true; + + private long[] receieved = new long[numTestMessages + 1]; + private boolean passed=false; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + + QpidClientConnection conn = new QpidClientConnection(); + + conn.connect(); + // clear queue + conn.consume(queue, consumeTimeout); + // load test data + _logger.info("creating test data, " + numTestMessages + " messages"); + conn.put(queue, payload, numTestMessages); + // close this connection + conn.disconnect(); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + + if (!passed) + { + QpidClientConnection conn = new QpidClientConnection(); + + conn.connect(); + // clear queue + conn.consume(queue, consumeTimeout); + } + TransportConnection.killVMBroker(1); + } + + /** multiple consumers */ + public void testDrain() throws JMSException, InterruptedException + { + QpidClientConnection conn = new QpidClientConnection(); + + conn.connect(); + + _logger.info("consuming queue " + queue); + Queue q = conn.getSession().createQueue(queue); + + final MessageConsumer consumer = conn.getSession().createConsumer(q); + int messagesReceived = 0; + + long messageLog[] = new long[numTestMessages + 1]; + + _logger.info("consuming..."); + Message msg = consumer.receive(1000); + while (msg != null) + { + messagesReceived++; + + long dt = ((AbstractJMSMessage) msg).getDeliveryTag(); + + int msgindex = msg.getIntProperty("index"); + if (messageLog[msgindex] != 0) + { + _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() + + ") more than once."); + } + + if (_logger.isInfoEnabled()) + { + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + + "DT:" + dt + + "IN:" + msgindex); + } + + if (dt == 0) + { + _logger.error("DT is zero for msg:" + msgindex); + } + + messageLog[msgindex] = dt; + + //get Next message + msg = consumer.receive(1000); + } + + conn.getSession().commit(); + consumer.close(); + assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived); + + int index = 0; + StringBuilder list = new StringBuilder(); + list.append("Failed to receive:"); + int failed = 0; + + for (long b : messageLog) + { + if (b == 0 && index != 0) //delivery tag of zero shouldn't exist + { + _logger.error("Index: " + index + " was not received."); + list.append(" "); + list.append(index); + list.append(":"); + list.append(b); + failed++; + } + + index++; + } + assertEquals(list.toString(), 0, failed); + _logger.info("consumed: " + messagesReceived); + conn.disconnect(); + } + + /** multiple consumers */ + public void testTwoCompetingConsumers() + { + Consumer c1 = new Consumer(); + Consumer c2 = new Consumer(); + Consumer c3 = new Consumer(); + Consumer c4 = new Consumer(); + + Thread t1 = new Thread(c1); + Thread t2 = new Thread(c2); + Thread t3 = new Thread(c3); + Thread t4 = new Thread(c4); + + t1.start(); +// t2.start(); +// t3.start(); +// t4.start(); + + try + { + t1.join(); + t2.join(); + t3.join(); + t4.join(); + } + catch (InterruptedException e) + { + fail("Uanble to join to Consumer theads"); + } + + _logger.info("consumer 1 count is " + c1.getCount()); + _logger.info("consumer 2 count is " + c2.getCount()); + _logger.info("consumer 3 count is " + c3.getCount()); + _logger.info("consumer 4 count is " + c4.getCount()); + + Integer totalConsumed = c1.getCount() + c2.getCount() + c3.getCount() + c4.getCount(); + + // Check all messages were correctly delivered + int index = 0; + StringBuilder list = new StringBuilder(); + list.append("Failed to receive:"); + int failed = 0; + + for (long b : receieved) + { + if (b == 0 && index != 0) //delivery tag of zero shouldn't exist (and we don't have msg 0) + { + _logger.error("Index: " + index + " was not received."); + list.append(" "); + list.append(index); + list.append(":"); + list.append(b); + failed++; + } + index++; + } + assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed); + assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed); + passed=true; + } + + class Consumer implements Runnable + { + private Integer count = 0; + private Integer id; + + public Consumer() + { + id = consumerIds.addAndGet(1); + } + + public void run() + { + try + { + _logger.info("consumer-" + id + ": starting"); + QpidClientConnection conn = new QpidClientConnection(); + + conn.connect(); + + _logger.info("consumer-" + id + ": connected, consuming..."); + Message result; + do + { + result = conn.getNextMessage(queue, consumeTimeout); + if (result != null) + { + + long dt = ((AbstractJMSMessage) result).getDeliveryTag(); + + if (testReception) + { + int msgindex = result.getIntProperty("index"); + if (receieved[msgindex] != 0) + { + _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) result).getDeliveryTag() + + ") more than once."); + } + + if (_logger.isInfoEnabled()) + { + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + + "DT:" + dt + + "IN:" + msgindex); + } + + if (dt == 0) + { + _logger.error("DT is zero for msg:" + msgindex); + } + + receieved[msgindex] = dt; + } + + + count++; + if (count % 100 == 0) + { + _logger.info("consumer-" + id + ": got " + result + ", new count is " + count); + } + } + } + while (result != null); + + _logger.info("consumer-" + id + ": complete"); + conn.disconnect(); + + } + catch (Exception e) + { + e.printStackTrace(); + } + } + + public Integer getCount() + { + return count; + } + + public Integer getId() + { + return id; + } + } + + + public class QpidClientConnection implements ExceptionListener + { + private boolean transacted = true; + private int ackMode = Session.CLIENT_ACKNOWLEDGE; + private Connection connection; + + private String virtualHost; + private String brokerlist; + private int prefetch; + protected Session session; + protected boolean connected; + + public QpidClientConnection() + { + super(); + setVirtualHost("/test"); + setBrokerList(BROKER); + setPrefetch(5000); + } + + + public void connect() throws JMSException + { + if (!connected) + { + /* + * amqp://[user:pass@][clientid]/virtualhost? + * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' + * [&failover='method[?option='value'[&option='value']]'] + * [&option='value']" + */ + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; + try + { + AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); + _logger.info("connecting to Qpid :" + brokerUrl); + connection = factory.createConnection(); + + // register exception listener + connection.setExceptionListener(this); + + session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); + + + _logger.info("starting connection"); + connection.start(); + + connected = true; + } + catch (URLSyntaxException e) + { + throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); + } + } + } + + public void disconnect() throws JMSException + { + if (connected) + { + session.commit(); + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected"); + } + } + + public void disconnectWithoutCommit() throws JMSException + { + if (connected) + { + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected without commit"); + } + } + + public String getBrokerList() + { + return brokerlist; + } + + public void setBrokerList(String brokerlist) + { + this.brokerlist = brokerlist; + } + + public String getVirtualHost() + { + return virtualHost; + } + + public void setVirtualHost(String virtualHost) + { + this.virtualHost = virtualHost; + } + + public void setPrefetch(int prefetch) + { + this.prefetch = prefetch; + } + + + /** override as necessary */ + public void onException(JMSException exception) + { + _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); + } + + public boolean isConnected() + { + return connected; + } + + public Session getSession() + { + return session; + } + + /** + * Put a String as a text messages, repeat n times. A null payload will result in a null message. + * + * @param queueName The queue name to put to + * @param payload the content of the payload + * @param copies the number of messages to put + * + * @throws javax.jms.JMSException any exception that occurs + */ + public void put(String queueName, String payload, int copies) throws JMSException + { + if (!connected) + { + connect(); + } + + _logger.info("putting to queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageProducer sender = session.createProducer(queue); + + for (int i = 0; i < copies; i++) + { + Message m = session.createTextMessage(payload + i); + m.setIntProperty("index", i + 1); + sender.send(m); + } + + session.commit(); + sender.close(); + _logger.info("put " + copies + " copies"); + } + + /** + * GET the top message on a queue. Consumes the message. Accepts timeout value. + * + * @param queueName The quename to get from + * @param readTimeout The timeout to use + * + * @return the content of the text message if any + * + * @throws javax.jms.JMSException any exception that occured + */ + public Message getNextMessage(String queueName, long readTimeout) throws JMSException + { + if (!connected) + { + connect(); + } + + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(readTimeout); + session.commit(); + consumer.close(); + + Message result; + + // all messages we consume should be TextMessages + if (message instanceof TextMessage) + { + result = ((TextMessage) message); + } + else if (null == message) + { + result = null; + } + else + { + _logger.info("warning: received non-text message"); + result = message; + } + + return result; + } + + /** + * GET the top message on a queue. Consumes the message. + * + * @param queueName The Queuename to get from + * + * @return The string content of the text message, if any received + * + * @throws javax.jms.JMSException any exception that occurs + */ + public Message getNextMessage(String queueName) throws JMSException + { + return getNextMessage(queueName, 0); + } + + /** + * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. + * + * @param queueName The Queue name to consume from + * @param readTimeout The timeout for each consume + * + * @throws javax.jms.JMSException Any exception that occurs during the consume + * @throws InterruptedException If the consume thread was interrupted during a consume. + */ + public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException + { + if (!connected) + { + connect(); + } + + _logger.info("consuming queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + int messagesReceived = 0; + + _logger.info("consuming..."); + while ((consumer.receive(readTimeout)) != null) + { + messagesReceived++; + } + + session.commit(); + consumer.close(); + _logger.info("consumed: " + messagesReceived); + } + } + + + public void testRequeue() throws JMSException, AMQException, URLSyntaxException + { + String virtualHost = "/test"; + String brokerlist = "vm://:1"; + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; + + Connection conn = new AMQConnection(brokerUrl); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue q = session.createQueue(queue); + + _logger.info("Create Consumer"); + MessageConsumer consumer = session.createConsumer(q); + + try + { + Thread.sleep(2000); + } + catch (InterruptedException e) + { + // + } + + _logger.info("Receiving msg"); + Message msg = consumer.receive(); + + assertNotNull("Message should not be null", msg); + + _logger.info("Close Consumer"); + consumer.close(); + + _logger.info("Close Connection"); + conn.close(); + } + +}
\ No newline at end of file diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index 07ef5f04d4..fb5ea58174 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -80,7 +80,8 @@ public class StreamMessageTest extends TestCase //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + // This is the default now Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 0d75a6b968..2abc139ced 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -43,7 +43,8 @@ import javax.jms.TextMessage; public class CommitRollbackTest extends TestCase { protected AMQConnection conn; - protected final String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue"; + protected String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue"; + protected static int testMethod = 0; protected String payload = "xyzzy"; private Session _session; private MessageProducer _publisher; @@ -57,6 +58,11 @@ public class CommitRollbackTest extends TestCase { super.setUp(); TransportConnection.createVMBroker(1); + + testMethod++; + queue += testMethod; + + newConnection(); } @@ -84,7 +90,11 @@ public class CommitRollbackTest extends TestCase TransportConnection.killVMBroker(1); } - /** PUT a text message, disconnect before commit, confirm it is gone. */ + /** + * PUT a text message, disconnect before commit, confirm it is gone. + * + * @throws Exception On error + */ public void testPutThenDisconnect() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); @@ -109,7 +119,11 @@ public class CommitRollbackTest extends TestCase assertNull("test message was put and disconnected before commit, but is still present", result); } - /** PUT a text message, disconnect before commit, confirm it is gone. */ + /** + * PUT a text message, disconnect before commit, confirm it is gone. + * + * @throws Exception On error + */ public void testPutThenCloseDisconnect() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); @@ -140,6 +154,8 @@ public class CommitRollbackTest extends TestCase /** * PUT a text message, rollback, confirm message is gone. The consumer is on the same connection but different * session as producer + * + * @throws Exception On error */ public void testPutThenRollback() throws Exception { @@ -160,7 +176,11 @@ public class CommitRollbackTest extends TestCase assertNull("test message was put and rolled back, but is still present", result); } - /** GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection */ + /** + * GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection + * + * @throws Exception On error + */ public void testGetThenDisconnect() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); @@ -194,6 +214,8 @@ public class CommitRollbackTest extends TestCase /** * GET a text message, close consumer, disconnect before commit, confirm it is still there. The consumer is on the * same connection but different session as producer + * + * @throws Exception On error */ public void testGetThenCloseDisconnect() throws Exception { @@ -230,6 +252,8 @@ public class CommitRollbackTest extends TestCase /** * GET a text message, rollback, confirm it is still there. The consumer is on the same connection but differnt * session to the producer + * + * @throws Exception On error */ public void testGetThenRollback() throws Exception { @@ -266,6 +290,8 @@ public class CommitRollbackTest extends TestCase /** * GET a text message, close message producer, rollback, confirm it is still there. The consumer is on the same * connection but different session as producer + * + * @throws Exception On error */ public void testGetThenCloseRollback() throws Exception { @@ -304,7 +330,11 @@ public class CommitRollbackTest extends TestCase } - /** Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order */ + /** + * Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order + * + * @throws Exception On error + */ public void testSend2ThenRollback() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); @@ -339,37 +369,41 @@ public class CommitRollbackTest extends TestCase public void testSend2ThenCloseAfter1andTryAgain() throws Exception { -// assertTrue("session is not transacted", _session.getTransacted()); -// assertTrue("session is not transacted", _pubSession.getTransacted()); -// -// _logger.info("sending two test messages"); -// _publisher.send(_pubSession.createTextMessage("1")); -// _publisher.send(_pubSession.createTextMessage("2")); -// _pubSession.commit(); -// -// _logger.info("getting test message"); -// assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText()); -// -// _consumer.close(); -// -// _consumer = _session.createConsumer(_jmsQueue); -// -// _logger.info("receiving result"); -// Message result = _consumer.receive(1000); -// _logger.error("1:" + result); -//// assertNotNull("test message was consumed and rolled back, but is gone", result); -//// assertEquals("1" , ((TextMessage) result).getText()); -//// assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); -// -// result = _consumer.receive(1000); -// _logger.error("2" + result); -//// assertNotNull("test message was consumed and rolled back, but is gone", result); -//// assertEquals("2", ((TextMessage) result).getText()); -//// assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); -// -// result = _consumer.receive(1000); -// _logger.error("3" + result); -// assertNull("test message should be null:" + result, result); + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending two test messages"); + _publisher.send(_pubSession.createTextMessage("1")); + _publisher.send(_pubSession.createTextMessage("2")); + _pubSession.commit(); + + _logger.info("getting test message"); + Message result = _consumer.receive(1000); + + assertNotNull("Message received should not be null", result); + assertEquals("1", ((TextMessage) result).getText()); + assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); + + + _logger.info("Closing Consumer"); + _consumer.close(); + + _logger.info("Creating New consumer"); + _consumer = _session.createConsumer(_jmsQueue); + + _logger.info("receiving result"); + result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("1", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); + + result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("2", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); + + result = _consumer.receive(1000); + assertNull("test message should be null:" + result, result); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java index 94cbb426e5..d994d4c141 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -62,69 +62,125 @@ public class TransactedTest extends TestCase { super.setUp(); TransportConnection.createVMBroker(1); + _logger.info("Create Connection"); con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test"); + + _logger.info("Create Session"); session = con.createSession(true, Session.SESSION_TRANSACTED); + _logger.info("Create Q1"); queue1 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); + _logger.info("Create Q2"); queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false); - + _logger.info("Create Consumer of Q1"); consumer1 = session.createConsumer(queue1); - //Dummy just to create the queue. + //Dummy just to create the queue. + _logger.info("Create Consumer of Q2"); MessageConsumer consumer2 = session.createConsumer(queue2); + _logger.info("Close Consumer of Q2"); consumer2.close(); + + _logger.info("Create producer to Q2"); producer2 = session.createProducer(queue2); + + _logger.info("Start Connection"); con.start(); + _logger.info("Create prep connection"); prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "test"); + + _logger.info("Create prep session"); prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); + + _logger.info("Create prep producer to Q1"); prepProducer1 = prepSession.createProducer(queue1); + + _logger.info("Create prep connection start"); prepCon.start(); - //add some messages - prepProducer1.send(prepSession.createTextMessage("A")); - prepProducer1.send(prepSession.createTextMessage("B")); - prepProducer1.send(prepSession.createTextMessage("C")); + _logger.info("Create test connection"); testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test"); + _logger.info("Create test session"); testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); + _logger.info("Create test consumer of q2"); testConsumer2 = testSession.createConsumer(queue2); - } protected void tearDown() throws Exception { + _logger.info("Close connection"); con.close(); + _logger.info("Close test connection"); testCon.close(); + _logger.info("Close prep connection"); prepCon.close(); + _logger.info("Kill broker"); TransportConnection.killAllVMBrokers(); super.tearDown(); } public void testCommit() throws Exception { + //add some messages + _logger.info("Send prep A"); + prepProducer1.send(prepSession.createTextMessage("A")); + _logger.info("Send prep B"); + prepProducer1.send(prepSession.createTextMessage("B")); + _logger.info("Send prep C"); + prepProducer1.send(prepSession.createTextMessage("C")); + //send and receive some messages + _logger.info("Send X to Q2"); producer2.send(session.createTextMessage("X")); + _logger.info("Send Y to Q2"); producer2.send(session.createTextMessage("Y")); + _logger.info("Send Z to Q2"); producer2.send(session.createTextMessage("Z")); + + + _logger.info("Read A from Q1"); expect("A", consumer1.receive(1000)); + _logger.info("Read B from Q1"); expect("B", consumer1.receive(1000)); + _logger.info("Read C from Q1"); expect("C", consumer1.receive(1000)); //commit + _logger.info("session commit"); session.commit(); + _logger.info("Start test Connection"); testCon.start(); + //ensure sent messages can be received and received messages are gone + _logger.info("Read X from Q2"); expect("X", testConsumer2.receive(1000)); + _logger.info("Read Y from Q2"); expect("Y", testConsumer2.receive(1000)); + _logger.info("Read Z from Q2"); expect("Z", testConsumer2.receive(1000)); + _logger.info("create test session on Q1"); testConsumer1 = testSession.createConsumer(queue1); + _logger.info("Read null from Q1"); assertTrue(null == testConsumer1.receive(1000)); + _logger.info("Read null from Q2"); assertTrue(null == testConsumer2.receive(1000)); } public void testRollback() throws Exception { + //add some messages + _logger.info("Send prep A"); + prepProducer1.send(prepSession.createTextMessage("A")); + _logger.info("Send prep B"); + prepProducer1.send(prepSession.createTextMessage("B")); + _logger.info("Send prep C"); + prepProducer1.send(prepSession.createTextMessage("C")); + + //Quick sleep to ensure all three get pre-fetched + Thread.sleep(500); + _logger.info("Sending X Y Z"); producer2.send(session.createTextMessage("X")); producer2.send(session.createTextMessage("Y")); @@ -140,9 +196,9 @@ public class TransactedTest extends TestCase _logger.info("Receiving A B C"); //ensure sent messages are not visible and received messages are requeued - expect("A", consumer1.receive(1000)); - expect("B", consumer1.receive(1000)); - expect("C", consumer1.receive(1000)); + expect("A", consumer1.receive(1000), true); + expect("B", consumer1.receive(1000), true); + expect("C", consumer1.receive(1000), true); _logger.info("Starting new connection"); testCon.start(); @@ -152,20 +208,22 @@ public class TransactedTest extends TestCase assertTrue(null == testConsumer2.receive(1000)); session.commit(); + + _logger.info("Testing we have no messages left after commit"); + assertTrue(null == testConsumer1.receive(1000)); + assertTrue(null == testConsumer2.receive(1000)); } public void testResendsMsgsAfterSessionClose() throws Exception { AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); - Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED); AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false); MessageConsumer consumer = consumerSession.createConsumer(queue3); - //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); - Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED); MessageProducer producer = producerSession.createProducer(queue3); _logger.info("Sending four messages"); @@ -176,65 +234,77 @@ public class TransactedTest extends TestCase producerSession.commit(); - _logger.info("Starting connection"); con.start(); TextMessage tm = (TextMessage) consumer.receive(); + assertNotNull(tm); + assertEquals("msg1", tm.getText()); - tm.acknowledge(); consumerSession.commit(); - _logger.info("Received and acknowledged first message"); + _logger.info("Received and committed first message"); tm = (TextMessage) consumer.receive(1000); assertNotNull(tm); + assertEquals("msg2", tm.getText()); + tm = (TextMessage) consumer.receive(1000); assertNotNull(tm); + assertEquals("msg3", tm.getText()); + tm = (TextMessage) consumer.receive(1000); assertNotNull(tm); + assertEquals("msg4", tm.getText()); + _logger.info("Received all four messages. Closing connection with three outstanding messages"); consumerSession.close(); - consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE); + consumerSession = con.createSession(true, Session.SESSION_TRANSACTED); consumer = consumerSession.createConsumer(queue3); // no ack for last three messages so when I call recover I expect to get three messages back - tm = (TextMessage) consumer.receive(3000); assertNotNull(tm); assertEquals("msg2", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); tm = (TextMessage) consumer.receive(3000); assertNotNull(tm); assertEquals("msg3", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); tm = (TextMessage) consumer.receive(3000); assertNotNull(tm); assertEquals("msg4", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); + + _logger.info("Received redelivery of three messages. Committing"); - _logger.info("Received redelivery of three messages. Acknowledging last message"); - tm.acknowledge(); consumerSession.commit(); - _logger.info("Calling acknowledge with no outstanding messages"); - // all acked so no messages to be delivered + _logger.info("Called commit"); - tm = (TextMessage) consumer.receiveNoWait(); + tm = (TextMessage) consumer.receive(1000); assertNull(tm); + _logger.info("No messages redelivered as is expected"); con.close(); con2.close(); - } - private void expect(String text, Message msg) throws JMSException { + expect(text, msg, false); + } + + private void expect(String text, Message msg, boolean requeued) throws JMSException + { assertNotNull("Message should not be null", msg); assertTrue("Message should be a text message", msg instanceof TextMessage); assertEquals("Message content does not match expected", text, ((TextMessage) msg).getText()); + assertEquals("Message should " + (requeued ? "" : "not") + " be requeued", requeued, msg.getJMSRedelivered()); } public static junit.framework.Test suite() |
