diff options
Diffstat (limited to 'java/client')
11 files changed, 604 insertions, 400 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java index 053d380129..4662f80c5b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java @@ -49,6 +49,7 @@ public class AMQQueueBrowser implements QueueBrowser _session = session; _queue = queue; _messageSelector = (messageSelector == null) || (messageSelector.trim().length() == 0) ? null : messageSelector; + // Create Consumer to verify message selector. BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); consumer.close(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 61143eee69..184bc44912 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -25,6 +25,7 @@ import java.text.MessageFormat; import java.util.ArrayList; import java.util.Iterator; import java.util.Map; +import java.util.Arrays; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -294,8 +295,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_dispatcherLogger.isDebugEnabled()) { - _dispatcherLogger.debug("Dispatcher Connection " + (connectionStopped ? "Started" : "Stopped") + - ": Currently " + (currently ? "Started" : "Stopped")); + _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started") + + ": Currently " + (currently ? "Stopped" : "Started")); } } return currently; @@ -307,22 +308,31 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag); - if (consumer == null) + if (consumer == null || consumer.isClosed()) { if (_dispatcherLogger.isInfoEnabled()) { - _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + - "[" + message.getDeliverBody().deliveryTag + "] from queue " - + message.getDeliverBody().consumerTag + " without a handler - rejecting(requeue)..."); + if (consumer == null) + { + _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + + "[" + message.getDeliverBody().deliveryTag + "] from queue " + + message.getDeliverBody().consumerTag + + " )without a handler - rejecting(requeue)..."); + } + else + { + _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + + "[" + message.getDeliverBody().deliveryTag + "] from queue " + + " consumer(" + consumer.debugIdentity() + + ") is closed rejecting(requeue)..."); + } } rejectMessage(message, true); } else { - consumer.notifyMessage(message, _channelId); - } } } @@ -354,7 +364,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi for (BasicMessageConsumer consumer : _consumers.values()) { - consumer.rollback(); + if (!consumer.isNoConsume()) + { + consumer.rollback(); + } + else + { + // should perhaps clear the _SQ here. + //consumer._synchronousQueue.clear(); + consumer.clearReceiveQueue(); + } + + } setConnectionStopped(isStopped); @@ -379,8 +400,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // Reject messages on pre-dispatch queue rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); - // Remove consumer from map. - deregisterConsumer(consumer); + // closeConsumer + consumer.markClosed(); _dispatcher.setConnectionStopped(stopped); @@ -624,6 +645,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void close(long timeout) throws JMSException { + if (_logger.isInfoEnabled()) + { + _logger.info("Closing session: " + this + ":" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + } + // We must close down all producers and consumers in an orderly fashion. This is the only method // that can be called from a different thread of control from the one controlling the session synchronized (_connection.getFailoverMutex()) @@ -2063,26 +2089,39 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // Remove the consumer from the map BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); if (consumer != null) - { - if (consumer.isAutoClose()) + { +// fixme this isn't right.. needs to check if _queue contains data for this consumer + if (consumer.isAutoClose())// && _queue.isEmpty()) { consumer.closeWhenNoMessages(true); } - //Clean the Maps up first - //Flush any pending messages for this consumerTag - if (_dispatcher != null) + if (!consumer.isNoConsume()) { - _logger.info("Dispatcher is not null"); + //Clean the Maps up first + //Flush any pending messages for this consumerTag + if (_dispatcher != null) + { + _logger.info("Dispatcher is not null"); + } + else + { + _logger.info("Dispatcher is null so created stopped dispatcher"); + + startDistpatcherIfNecessary(true); + } + + _dispatcher.rejectPending(consumer); } else { - _logger.info("Dispatcher is null so created stopped dispatcher"); + //Just close the consumer + //fixme the CancelOK is being processed before the arriving messages.. + // The dispatcher is still to process them so the server sent in order but the client + // has yet to receive before the close comes in. - startDistpatcherIfNecessary(true); +// consumer.markClosed(); } - - _dispatcher.rejectPending(consumer); } else { @@ -2217,7 +2256,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue) { Iterator messages = _queue.iterator(); + if (_logger.isInfoEnabled()) + { + _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + + ") (PDispatchQ) requeue:" + requeue); + if (messages.hasNext()) + { + _logger.info("Checking all messages in _queue for Consumer tag(" + consumerTag + ")"); + } + else + { + _logger.info("No messages in _queue to reject"); + } + } while (messages.hasNext()) { UnprocessedMessage message = (UnprocessedMessage) messages.next(); @@ -2239,10 +2291,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); } } - else - { - _logger.error("Pruned pending message for consumer:" + consumerTag); - } } } @@ -2250,9 +2298,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void rejectMessage(UnprocessedMessage message, boolean requeue) { - if (_logger.isDebugEnabled()) + if (_logger.isTraceEnabled()) { - _logger.debug("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag); + _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag); } rejectMessage(message.getDeliverBody().deliveryTag, requeue); @@ -2260,9 +2308,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void rejectMessage(AbstractJMSMessage message, boolean requeue) { - if (_logger.isDebugEnabled()) + if (_logger.isTraceEnabled()) { - _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag()); + _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag()); } rejectMessage(message.getDeliveryTag(), requeue); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 9043faa80c..73010ce517 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -22,6 +22,7 @@ package org.apache.qpid.client; import java.util.Iterator; import java.util.List; +import java.util.Arrays; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -118,7 +119,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>(); - /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */ + /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */ private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>(); /** @@ -135,6 +136,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private boolean _closeWhenNoMessages; private boolean _noConsume; + private List<StackTraceElement> _closedStack = null; protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, @@ -157,6 +159,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true); _autoClose = autoClose; _noConsume = noConsume; + + //Force queue browsers not to use acknowledge modes. + if (_noConsume) + { + _acknowledgeMode = Session.NO_ACKNOWLEDGE; + } } public AMQDestination getDestination() @@ -433,6 +441,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void close(boolean sendClose) throws JMSException { + //synchronized (_closed) + if (_logger.isInfoEnabled()) { _logger.info("Closing consumer:" + debugIdentity()); @@ -442,6 +452,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (!_closed.getAndSet(true)) { + if (_logger.isTraceEnabled()) + { + if (_closedStack != null) + { + _logger.trace(_consumerTag + " close():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); + } + else + { + _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6); + } + } if (sendClose) { // TODO: Be aware of possible changes to parameter order as versions change. @@ -467,9 +489,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer throw new JMSException("Error closing consumer: " + e); } } + else + { +// //fixme this probably is not right +// if (!isNoConsume()) + { //done in BasicCancelOK Handler but not sending one so just deregister. + deregisterConsumer(); + } + } - //done in BasicCancelOK Handler - //deregisterConsumer(); if (_messageListener != null && _receiving.get()) { if (_logger.isInfoEnabled()) @@ -488,7 +516,23 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ void markClosed() { - _closed.set(true); +// synchronized (_closed) + { + _closed.set(true); + + if (_logger.isTraceEnabled()) + { + if (_closedStack != null) + { + _logger.trace(_consumerTag + " markClosed():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); + } + else + { + _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8); + } + } + } deregisterConsumer(); } @@ -520,11 +564,24 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _logger.debug("Message is of type: " + jmsMessage.getClass().getName()); } - jmsMessage.setConsumer(this); +// synchronized (_closed) + { +// if (!_closed.get()) + { + + jmsMessage.setConsumer(this); - preDeliver(jmsMessage); + preDeliver(jmsMessage); - notifyMessage(jmsMessage, channelId); + notifyMessage(jmsMessage, channelId); + } +// else +// { +// _logger.error("MESSAGE REJECTING!"); +// _session.rejectMessage(jmsMessage, true); +// //_logger.error("MESSAGE JUST DROPPED!"); +// } + } } catch (Exception e) { @@ -551,9 +608,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { //we do not need a lock around the test above, and the dispatch below as it is invalid //for an application to alter an installed listener while the session is started - preApplicationProcessing(jmsMessage); - getMessageListener().onMessage(jmsMessage); - postDeliver(jmsMessage); +// synchronized (_closed) + { +// if (!_closed.get()) + { + + preApplicationProcessing(jmsMessage); + getMessageListener().onMessage(jmsMessage); + postDeliver(jmsMessage); + } + } } else { @@ -649,14 +713,30 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer lastDeliveryTag = _receivedDeliveryTags.poll(); } + assert _receivedDeliveryTags.isEmpty(); + _session.acknowledgeMessage(lastDeliveryTag, true); } } void notifyError(Throwable cause) { - _closed.set(true); - +// synchronized (_closed) + { + _closed.set(true); + if (_logger.isTraceEnabled()) + { + if (_closedStack != null) + { + _logger.trace(_consumerTag + " notifyError():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + _logger.trace(_consumerTag + " previously" + _closedStack.toString()); + } + else + { + _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8); + } + } + } //QPID-293 can "request redelivery of this error through dispatcher" // we have no way of propagating the exception to a message listener - a JMS limitation - so we @@ -761,14 +841,20 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { clearUnackedMessages(); - if (_logger.isDebugEnabled()) + if (!_receivedDeliveryTags.isEmpty()) { - _logger.debug("Rejecting received messages"); + _logger.debug("Rejecting received messages in _receivedDTs (RQ)"); } //rollback received but not committed messages while (!_receivedDeliveryTags.isEmpty()) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" + + "for consumer with tag:" + _consumerTag); + } + Long tag = _receivedDeliveryTags.poll(); if (tag != null) @@ -782,12 +868,20 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + if (!_receivedDeliveryTags.isEmpty()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Queue _receivedDTs (RQ) was not empty after rejection"); + } + } + //rollback pending messages if (_synchronousQueue.size() > 0) { if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ")" + + _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" + "for consumer with tag:" + _consumerTag); } Iterator iterator = _synchronousQueue.iterator(); @@ -821,7 +915,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer rollback(); } - _synchronousQueue.clear(); + clearReceiveQueue(); } } @@ -831,4 +925,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer return String.valueOf(_consumerTag); } + public void clearReceiveQueue() + { + _synchronousQueue.clear(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/Closeable.java b/java/client/src/main/java/org/apache/qpid/client/Closeable.java index 6593f2c254..d246dc3931 100644 --- a/java/client/src/main/java/org/apache/qpid/client/Closeable.java +++ b/java/client/src/main/java/org/apache/qpid/client/Closeable.java @@ -25,20 +25,18 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.IllegalStateException; import javax.jms.JMSException; -/** - * Provides support for orderly shutdown of an object. - */ +/** Provides support for orderly shutdown of an object. */ public abstract class Closeable { /** - * We use an atomic boolean so that we do not have to synchronized access to this flag. Synchronizing - * access to this flag would mean have a synchronized block in every method. + * We use an atomic boolean so that we do not have to synchronized access to this flag. Synchronizing access to this + * flag would mean have a synchronized block in every method. */ protected final AtomicBoolean _closed = new AtomicBoolean(false); protected void checkNotClosed() throws JMSException { - if (_closed.get()) + if (isClosed()) { throw new IllegalStateException("Object " + toString() + " has been closed"); } @@ -46,7 +44,10 @@ public abstract class Closeable public boolean isClosed() { - return _closed.get(); +// synchronized (_closed) + { + return _closed.get(); + } } public abstract void close() throws JMSException; diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java index 794071cc34..0826deb2f4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java @@ -42,6 +42,6 @@ public class ChannelCloseOkMethodHandler implements StateAwareMethodListener { _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId()); - //todo this should do the closure + //todo this should do the local closure } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 19767b6575..e875b4dca8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -51,10 +51,8 @@ import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.protocol.AMQConstant; /** - * Wrapper for protocol session that provides type-safe access to session attributes. - * <p/> - * The underlying protocol session is still available but clients should not - * use it to obtain session attributes. + * Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol + * session is still available but clients should not use it to obtain session attributes. */ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { @@ -78,27 +76,23 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected WriteFuture _lastWriteFuture; /** - * The handler from which this session was created and which is used to handle protocol events. - * We send failover events to the handler. + * The handler from which this session was created and which is used to handle protocol events. We send failover + * events to the handler. */ protected final AMQProtocolHandler _protocolHandler; - /** - * Maps from the channel id to the AMQSession that it represents. - */ + /** Maps from the channel id to the AMQSession that it represents. */ protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap(); protected ConcurrentMap _closingChannels = new ConcurrentHashMap(); /** - * Maps from a channel id to an unprocessed message. This is used to tie together the - * JmsDeliverBody (which arrives first) with the subsequent content header and content bodies. + * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives + * first) with the subsequent content header and content bodies. */ protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); - /** - * Counter to ensure unique queue names - */ + /** Counter to ensure unique queue names */ protected int _queueId = 1; protected final Object _queueIdLock = new Object(); @@ -108,8 +102,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession /** - * No-arg constructor for use by test subclass - has to initialise final vars - * NOT intended for use other then for test + * No-arg constructor for use by test subclass - has to initialise final vars NOT intended for use other then for + * test */ public AMQProtocolSession() { @@ -147,7 +141,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { // start the process of setting up the connection. This is the first place that // data is written to the server. - + _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); } @@ -207,8 +201,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession /** * Store the SASL client currently being used for the authentication handshake * - * @param client if non-null, stores this in the session. if null clears any existing client - * being stored + * @param client if non-null, stores this in the session. if null clears any existing client being stored */ public void setSaslClient(SaslClient client) { @@ -237,10 +230,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } /** - * Callback invoked from the BasicDeliverMethodHandler when a message has been received. - * This is invoked on the MINA dispatcher thread. + * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA + * dispatcher thread. * * @param message + * * @throws AMQException if this was not expected */ public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException @@ -295,8 +289,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } /** - * Deliver a message to the appropriate session, removing the unprocessed message - * from our map + * Deliver a message to the appropriate session, removing the unprocessed message from our map * * @param channelId the channel id the message should be delivered to * @param msg the message @@ -309,8 +302,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } /** - * Convenience method that writes a frame to the protocol session. Equivalent - * to calling getProtocolSession().write(). + * Convenience method that writes a frame to the protocol session. Equivalent to calling + * getProtocolSession().write(). * * @param frame the frame to write */ @@ -377,15 +370,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } /** - * Called from the ChannelClose handler when a channel close frame is received. - * This method decides whether this is a response or an initiation. The latter - * case causes the AMQSession to be closed and an exception to be thrown if + * Called from the ChannelClose handler when a channel close frame is received. This method decides whether this is + * a response or an initiation. The latter case causes the AMQSession to be closed and an exception to be thrown if * appropriate. * * @param channelId the id of the channel (session) - * @return true if the client must respond to the server, i.e. if the server - * initiated the channel close, false if the channel close is just the server - * responding to the client's earlier request to close the channel. + * + * @return true if the client must respond to the server, i.e. if the server initiated the channel close, false if + * the channel close is just the server responding to the client's earlier request to close the channel. */ public boolean channelClosed(int channelId, AMQConstant code, String text) throws AMQException { @@ -450,9 +442,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession return new AMQShortString("tmp_" + localAddress + "_" + id); } - /** - * @param delay delay in seconds (not ms) - */ + /** @param delay delay in seconds (not ms) */ void initHeartbeats(int delay) { if (delay > 0) @@ -475,7 +465,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _protocolMajorVersion = versionMajor; _protocolMinorVersion = versionMinor; - _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor); + _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor); } public byte getProtocolMinorVersion() diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 69042d08ea..8368eee125 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -38,12 +38,10 @@ import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; /** - * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up - * the underlying connector, which currently always uses TCP/IP sockets. It creates the - * "protocol handler" which deals with MINA protocol events. - * <p/> - * Could be extended in future to support different transport types by turning this into concrete class/interface - * combo. + * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying + * connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA + * protocol events. <p/> Could be extended in future to support different transport types by turning this into concrete + * class/interface combo. */ public class TransportConnection { @@ -61,22 +59,6 @@ public class TransportConnection private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler"; - static - { - _acceptor = new VmPipeAcceptor(); - - IoServiceConfig config = _acceptor.getDefaultConfig(); - - config.setThreadModel(ReadWriteThreadModel.getInstance()); - } - - public static ITransportConnection getInstance() throws AMQTransportConnectionException - { - AMQBrokerDetails details = new AMQBrokerDetails(); - details.setTransport(BrokerDetails.TCP); - return getInstance(details); - } - public static ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException { int transport = getTransport(details.getTransport()); @@ -182,7 +164,14 @@ public class TransportConnection public static void createVMBroker(int port) throws AMQVMBrokerCreationException { + if (_acceptor == null) + { + _acceptor = new VmPipeAcceptor(); + IoServiceConfig config = _acceptor.getDefaultConfig(); + + config.setThreadModel(ReadWriteThreadModel.getInstance()); + } if (!_inVmPipeAddress.containsKey(port)) { diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index cb4ef01d25..642b928d81 100644 --- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -44,6 +44,10 @@ public class FlowControllingBlockingQueue /** We require a separate count so we can track whether we have reached the threshold */ private int _count; + public boolean isEmpty() + { + return _queue.isEmpty(); + } public interface ThresholdListener { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java index fe15fa5155..1e50a62fee 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java @@ -39,8 +39,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.client.message.JMSTextMessage; -import org.apache.qpid.testutil.VMBrokerSetup; public class PropertyValueTest extends TestCase implements MessageListener { @@ -59,19 +59,13 @@ public class PropertyValueTest extends TestCase implements MessageListener protected void setUp() throws Exception { super.setUp(); - try - { - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); - } - catch (Exception e) - { - fail("Unable to initialilse connection: " + e); - } + TransportConnection.createVMBroker(1); } protected void tearDown() throws Exception { super.tearDown(); + TransportConnection.killVMBroker(1); } private void init(AMQConnection connection) throws Exception @@ -91,14 +85,48 @@ public class PropertyValueTest extends TestCase implements MessageListener connection.start(); } - public void test() throws Exception + public void testOnce() { - int count = _count; - send(count); - waitFor(count); - check(); - _logger.info("Completed without failure"); - _connection.close(); + runBatch(1); + } + + public void test50() + { + runBatch(50); + } + + private void runBatch(int runSize) + { + try + { + int run = 0; + while (run < runSize) + { + _logger.error("Run Number:" + run++); + try + { + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); + } + catch (Exception e) + { + fail("Unable to initialilse connection: " + e); + } + + int count = _count; + send(count); + waitFor(count); + check(); + _logger.info("Completed without failure"); + _connection.close(); + + _logger.error("End Run Number:" + (run - 1)); + } + } + catch (Exception e) + { + _logger.error(e.getMessage(), e); + e.printStackTrace(); + } } void send(int count) throws JMSException @@ -138,7 +166,7 @@ public class PropertyValueTest extends TestCase implements MessageListener m.setJMSReplyTo(q); m.setStringProperty("TempQueue", q.toString()); - _logger.info("Message:" + m); + _logger.trace("Message:" + m); Assert.assertEquals("Check temp queue has been set correctly", m.getJMSReplyTo().toString(), m.getStringProperty("TempQueue")); @@ -150,7 +178,7 @@ public class PropertyValueTest extends TestCase implements MessageListener m.setShortProperty("Short", (short) Short.MAX_VALUE); m.setStringProperty("String", "Test"); - _logger.info("Sending Msg:" + m); + _logger.debug("Sending Msg:" + m); producer.send(m); } } @@ -206,8 +234,11 @@ public class PropertyValueTest extends TestCase implements MessageListener Assert.assertEquals("Check String properties are correctly transported", "Test", m.getStringProperty("String")); } + received.clear(); assertEqual(messages.iterator(), actual.iterator()); + + messages.clear(); } private static void assertEqual(Iterator expected, Iterator actual) @@ -269,11 +300,11 @@ public class PropertyValueTest extends TestCase implements MessageListener { test._count = Integer.parseInt(argv[1]); } - test.test(); + test.testOnce(); } public static junit.framework.Test suite() { - return new VMBrokerSetup(new junit.framework.TestSuite(PropertyValueTest.class)); + return new junit.framework.TestSuite(PropertyValueTest.class); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java index a56bae3d70..7762cb3fe9 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -42,6 +42,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.AMQException; +import org.apache.qpid.testutil.QpidClientConnection; import org.apache.log4j.Logger; import org.apache.log4j.Level; @@ -62,14 +63,14 @@ public class MessageRequeueTest extends TestCase private boolean testReception = true; private long[] receieved = new long[numTestMessages + 1]; - private boolean passed=false; + private boolean passed = false; protected void setUp() throws Exception { super.setUp(); TransportConnection.createVMBroker(1); - QpidClientConnection conn = new QpidClientConnection(); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); // clear queue @@ -85,21 +86,28 @@ public class MessageRequeueTest extends TestCase { super.tearDown(); - if (!passed) + if (!passed) // clean up { - QpidClientConnection conn = new QpidClientConnection(); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); // clear queue conn.consume(queue, consumeTimeout); + + conn.disconnect(); } TransportConnection.killVMBroker(1); } - /** multiple consumers */ + /** + * multiple consumers + * + * @throws javax.jms.JMSException if a JMS problem occurs + * @throws InterruptedException on timeout + */ public void testDrain() throws JMSException, InterruptedException { - QpidClientConnection conn = new QpidClientConnection(); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); @@ -170,6 +178,7 @@ public class MessageRequeueTest extends TestCase assertEquals(list.toString(), 0, failed); _logger.info("consumed: " + messagesReceived); conn.disconnect(); + passed = true; } /** multiple consumers */ @@ -186,8 +195,8 @@ public class MessageRequeueTest extends TestCase Thread t4 = new Thread(c4); t1.start(); -// t2.start(); -// t3.start(); + t2.start(); + t3.start(); // t4.start(); try @@ -230,7 +239,7 @@ public class MessageRequeueTest extends TestCase } assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed); assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed); - passed=true; + passed = true; } class Consumer implements Runnable @@ -248,7 +257,7 @@ public class MessageRequeueTest extends TestCase try { _logger.info("consumer-" + id + ": starting"); - QpidClientConnection conn = new QpidClientConnection(); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); @@ -318,286 +327,51 @@ public class MessageRequeueTest extends TestCase } - public class QpidClientConnection implements ExceptionListener + public void testRequeue() throws JMSException, AMQException, URLSyntaxException { - private boolean transacted = true; - private int ackMode = Session.CLIENT_ACKNOWLEDGE; - private Connection connection; - - private String virtualHost; - private String brokerlist; - private int prefetch; - protected Session session; - protected boolean connected; - - public QpidClientConnection() - { - super(); - setVirtualHost("/test"); - setBrokerList(BROKER); - setPrefetch(5000); - } - - - public void connect() throws JMSException - { - if (!connected) - { - /* - * amqp://[user:pass@][clientid]/virtualhost? - * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' - * [&failover='method[?option='value'[&option='value']]'] - * [&option='value']" - */ - String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; - try - { - AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); - _logger.info("connecting to Qpid :" + brokerUrl); - connection = factory.createConnection(); - - // register exception listener - connection.setExceptionListener(this); - - session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); - - - _logger.info("starting connection"); - connection.start(); - - connected = true; - } - catch (URLSyntaxException e) - { - throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); - } - } - } - - public void disconnect() throws JMSException - { - if (connected) - { - session.commit(); - session.close(); - connection.close(); - connected = false; - _logger.info("disconnected"); - } - } - - public void disconnectWithoutCommit() throws JMSException - { - if (connected) - { - session.close(); - connection.close(); - connected = false; - _logger.info("disconnected without commit"); - } - } - - public String getBrokerList() - { - return brokerlist; - } - - public void setBrokerList(String brokerlist) - { - this.brokerlist = brokerlist; - } - - public String getVirtualHost() + int run = 0; + while (run < 10) { - return virtualHost; - } - - public void setVirtualHost(String virtualHost) - { - this.virtualHost = virtualHost; - } - - public void setPrefetch(int prefetch) - { - this.prefetch = prefetch; - } + run++; - - /** override as necessary */ - public void onException(JMSException exception) - { - _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); - } - - public boolean isConnected() - { - return connected; - } - - public Session getSession() - { - return session; - } - - /** - * Put a String as a text messages, repeat n times. A null payload will result in a null message. - * - * @param queueName The queue name to put to - * @param payload the content of the payload - * @param copies the number of messages to put - * - * @throws javax.jms.JMSException any exception that occurs - */ - public void put(String queueName, String payload, int copies) throws JMSException - { - if (!connected) - { - connect(); - } - - _logger.info("putting to queue " + queueName); - Queue queue = session.createQueue(queueName); - - final MessageProducer sender = session.createProducer(queue); - - for (int i = 0; i < copies; i++) - { - Message m = session.createTextMessage(payload + i); - m.setIntProperty("index", i + 1); - sender.send(m); - } - - session.commit(); - sender.close(); - _logger.info("put " + copies + " copies"); - } - - /** - * GET the top message on a queue. Consumes the message. Accepts timeout value. - * - * @param queueName The quename to get from - * @param readTimeout The timeout to use - * - * @return the content of the text message if any - * - * @throws javax.jms.JMSException any exception that occured - */ - public Message getNextMessage(String queueName, long readTimeout) throws JMSException - { - if (!connected) + if (_logger.isInfoEnabled()) { - connect(); + _logger.info("testRequeue run " + run); } - Queue queue = session.createQueue(queueName); + String virtualHost = "/test"; + String brokerlist = BROKER; + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; - final MessageConsumer consumer = session.createConsumer(queue); + Connection conn = new AMQConnection(brokerUrl); + Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue q = session.createQueue(queue); - Message message = consumer.receive(readTimeout); - session.commit(); - consumer.close(); - - Message result; + _logger.debug("Create Consumer"); + MessageConsumer consumer = session.createConsumer(q); - // all messages we consume should be TextMessages - if (message instanceof TextMessage) - { - result = ((TextMessage) message); - } - else if (null == message) + try { - result = null; + Thread.sleep(2000); } - else + catch (InterruptedException e) { - _logger.info("warning: received non-text message"); - result = message; + // } - return result; - } + _logger.debug("Receiving msg"); + Message msg = consumer.receive(1000); - /** - * GET the top message on a queue. Consumes the message. - * - * @param queueName The Queuename to get from - * - * @return The string content of the text message, if any received - * - * @throws javax.jms.JMSException any exception that occurs - */ - public Message getNextMessage(String queueName) throws JMSException - { - return getNextMessage(queueName, 0); - } + assertNotNull("Message should not be null", msg); - /** - * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. - * - * @param queueName The Queue name to consume from - * @param readTimeout The timeout for each consume - * - * @throws javax.jms.JMSException Any exception that occurs during the consume - * @throws InterruptedException If the consume thread was interrupted during a consume. - */ - public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException - { - if (!connected) - { - connect(); - } - - _logger.info("consuming queue " + queueName); - Queue queue = session.createQueue(queueName); - - final MessageConsumer consumer = session.createConsumer(queue); - int messagesReceived = 0; - - _logger.info("consuming..."); - while ((consumer.receive(readTimeout)) != null) - { - messagesReceived++; - } - session.commit(); + // As we have not ack'd message will be requeued. + _logger.debug("Close Consumer"); consumer.close(); - _logger.info("consumed: " + messagesReceived); - } - } - - - public void testRequeue() throws JMSException, AMQException, URLSyntaxException - { - String virtualHost = "/test"; - String brokerlist = "vm://:1"; - String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; - Connection conn = new AMQConnection(brokerUrl); - Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue q = session.createQueue(queue); - - _logger.info("Create Consumer"); - MessageConsumer consumer = session.createConsumer(q); - - try - { - Thread.sleep(2000); - } - catch (InterruptedException e) - { - // + _logger.debug("Close Connection"); + conn.close(); } - - _logger.info("Receiving msg"); - Message msg = consumer.receive(); - - assertNotNull("Message should not be null", msg); - - _logger.info("Close Consumer"); - consumer.close(); - - _logger.info("Close Connection"); - conn.close(); } }
\ No newline at end of file diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java new file mode 100644 index 0000000000..f2afa472ab --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java @@ -0,0 +1,268 @@ +package org.apache.qpid.testutil; + +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.log4j.Logger; + +import javax.jms.ExceptionListener; +import javax.jms.Session; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.MessageProducer; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.TextMessage; + +public class QpidClientConnection implements ExceptionListener +{ + + private static final Logger _logger = Logger.getLogger(QpidClientConnection.class); + + private boolean transacted = true; + private int ackMode = Session.CLIENT_ACKNOWLEDGE; + private Connection connection; + + private String virtualHost; + private String brokerlist; + private int prefetch; + protected Session session; + protected boolean connected; + + public QpidClientConnection(String broker) + { + super(); + setVirtualHost("/test"); + setBrokerList(broker); + setPrefetch(5000); + } + + + public void connect() throws JMSException + { + if (!connected) + { + /* + * amqp://[user:pass@][clientid]/virtualhost? + * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' + * [&failover='method[?option='value'[&option='value']]'] + * [&option='value']" + */ + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; + try + { + AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); + _logger.info("connecting to Qpid :" + brokerUrl); + connection = factory.createConnection(); + + // register exception listener + connection.setExceptionListener(this); + + session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); + + + _logger.info("starting connection"); + connection.start(); + + connected = true; + } + catch (URLSyntaxException e) + { + throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); + } + } + } + + public void disconnect() throws JMSException + { + if (connected) + { + session.commit(); + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected"); + } + } + + public void disconnectWithoutCommit() throws JMSException + { + if (connected) + { + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected without commit"); + } + } + + public String getBrokerList() + { + return brokerlist; + } + + public void setBrokerList(String brokerlist) + { + this.brokerlist = brokerlist; + } + + public String getVirtualHost() + { + return virtualHost; + } + + public void setVirtualHost(String virtualHost) + { + this.virtualHost = virtualHost; + } + + public void setPrefetch(int prefetch) + { + this.prefetch = prefetch; + } + + + /** override as necessary */ + public void onException(JMSException exception) + { + _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); + } + + public boolean isConnected() + { + return connected; + } + + public Session getSession() + { + return session; + } + + /** + * Put a String as a text messages, repeat n times. A null payload will result in a null message. + * + * @param queueName The queue name to put to + * @param payload the content of the payload + * @param copies the number of messages to put + * + * @throws javax.jms.JMSException any exception that occurs + */ + public void put(String queueName, String payload, int copies) throws JMSException + { + if (!connected) + { + connect(); + } + + _logger.info("putting to queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageProducer sender = session.createProducer(queue); + + for (int i = 0; i < copies; i++) + { + Message m = session.createTextMessage(payload + i); + m.setIntProperty("index", i + 1); + sender.send(m); + } + + session.commit(); + sender.close(); + _logger.info("put " + copies + " copies"); + } + + /** + * GET the top message on a queue. Consumes the message. Accepts timeout value. + * + * @param queueName The quename to get from + * @param readTimeout The timeout to use + * + * @return the content of the text message if any + * + * @throws javax.jms.JMSException any exception that occured + */ + public Message getNextMessage(String queueName, long readTimeout) throws JMSException + { + if (!connected) + { + connect(); + } + + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(readTimeout); + session.commit(); + consumer.close(); + + Message result; + + // all messages we consume should be TextMessages + if (message instanceof TextMessage) + { + result = ((TextMessage) message); + } + else if (null == message) + { + result = null; + } + else + { + _logger.info("warning: received non-text message"); + result = message; + } + + return result; + } + + /** + * GET the top message on a queue. Consumes the message. + * + * @param queueName The Queuename to get from + * + * @return The string content of the text message, if any received + * + * @throws javax.jms.JMSException any exception that occurs + */ + public Message getNextMessage(String queueName) throws JMSException + { + return getNextMessage(queueName, 0); + } + + /** + * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. + * + * @param queueName The Queue name to consume from + * @param readTimeout The timeout for each consume + * + * @throws javax.jms.JMSException Any exception that occurs during the consume + * @throws InterruptedException If the consume thread was interrupted during a consume. + */ + public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException + { + if (!connected) + { + connect(); + } + + _logger.info("consuming queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + int messagesReceived = 0; + + _logger.info("consuming..."); + while ((consumer.receive(readTimeout)) != null) + { + messagesReceived++; + } + + session.commit(); + consumer.close(); + _logger.info("consumed: " + messagesReceived); + } +} + |
