diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-03-06 14:12:47 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-03-06 14:12:47 +0000 |
| commit | 0c0dbd5e00f5b057867546bb336ece262577841c (patch) | |
| tree | 1449a82b945b7e67639a8a7ae157b89b0a09379e /qpid/java/client/src/main | |
| parent | ee52faa7093cf0c3b264ce6922b636cf54825e2a (diff) | |
| download | qpid-python-0c0dbd5e00f5b057867546bb336ece262577841c.tar.gz | |
QPID-403 QPID-346 QPID-355 QPID-386 QPID-389 Updates to fix Transactional Rollback.
QPID-346 Message loss after rollback\recover
QPID-355 Closing a consumer does not ensure messages delivery will stop for that subscription
QPID-386 Updated Transactional Tests to cover underlying AMQP/Qpid state.
QPID-389 Prefetched message are not correctly returned to the queue
QPID-403 Implement Basic.Reject
Broker
UnacknowledgedMessage - Added toString for debug
UnacknowledgedMessageMapImpl - Removed resendMessages method as all sending should go via DeliveryManager and Subscription.
AMQChannel - Updated resend and requeue methods so they do not directly write messages to a subscriber. This was violating the suspension state.
- Used a local non-transactional context to requeue messages as the internal requeuing of messages on the broker should not be part of any client transaction.
- Maked messages as resent.
- Removed warnings from IDE about missing JavaDoc text etc.
BasicAckMethodHandler - Added debugging
BasicRecoverMethodHandler - Removed session from the resend call.
BasicRejectMethodHandler - Initial implementation. Hooks left for possible 'resend' bit.
ChannelCloseHandler - Fixed bug where channel wasn't marked as fully closed on reception of a close from the client.
TxRollbackHandler - Removed session from resend call.
AMQMinaProtocolSession - Fixed bug where channel was marked as awaiting closure before it had actually been closed. This causes problems as the close looks up the channel by id again which will return null after it has been marked as awaiting closure.
AMQMessage - Initial implementation of Rejection. Currently inactive in hasInterest() as we are miss-using reject to requeue prefetched messages from the client.
AMQQueue - Removed debug method as it made reading the log very difficult as all the logs had the same line number
ConcurrentSelectorDeliveryManager - Fixed clearAllMessages() as it didn't actually remove the messages.
- Fixed bad logic in getNextMessage when using null subscriber. (as done by clearAllMessages)
- Added more logging messages. Made more frequent logging a trace value.
- Added debugIdentity() method to reduce over head in calculating standard log prefix.
- Allowed messages to be added to the front of the queue.
- Added currentStatus() to an overview of the queue's current state.
SubscriptionImpl - Updated to handle closure correctly (QPID-355)
-Updated the deliver method so it doesn't use a try->finally to do msg.setDeliveredToConsumer() as this would be done even in the event of an error.
- Created an additional logger to log suspension calls rather than through the SI logger which logs a lot of detail.
Client
pom.xml - Excluded older version of log4j that commons-collections exposes.
AMQSession - Added ability for dispatcher to start in stopped state.
- Added dispatcher logger
- Added checks around logging
- Added message rejection if the dispatcher receives a message that it doesn't have a consumer for.
- Updated message rejection to allow the dispatcher to perform the rejection if running this ensures that all queued messages are processed correctly and rejection occurs in order.
- rollback() before calling rollback all pending queued messages must be rejected as rollback will clear unacked map which the rejects caused by rollback() will need.
- fixed closedProducersAndConsumers so that it will rethrow any JMS Exception
- recover() as for rollback() the rejects need to be done before the Recover Call to the broker.
- Allowed delclareExchange to be done synchronously programatically
- Updated confirmConsumerCancelled to use the dispatcher to perform the clean up. This required the creation of the dispatcher in stopped mode so that it does not start and message attempted to be delivered while the subscriber is being cancelled.
BasicMessageConsumer - Updated close not to perform the deregistration. This is done in via BasicCancelOkMethodHandler
- Added guards on logging
- Record all messages that have been received so they can be rejected if rollback occurs. so had to change impl of acknowledgeLastDelivered.
- Updated Rollback to initially reject all received messages that are still unAcked.
- Added a recursive call should the queue not be empty at the end of the rollback.. with a warning.
BasicCancelOkMethodHandler - White space changes to meet style guide. Added guard on logging.
UnprocessedMessage - White space changes to meet style guide.
StateWaiter - Added comment about timeout bug.
FlowControllingBlockingQueue - Tidied imports
RecoverTest - Updated as declareExchange is now Synchronous
ChannelCloseTest - added guard on logging
MessageRequeueTest - Added to better test underlying AMQP/Qpid state QPID-386
StreamMessageTest - Updated as declareExchange is now Synchronous
CommitRollbackTest - added Additional test case to ensure prefetch queue is correctly purged.
TransactedTest - Added logging and additional tests.
Cluster
SimpleClusterTest - updated in line with AMQSession.delcareExchange changes
Common
AMQConstant - Fixed error code 'not allowed' should be 530 not 507.
ConcurrentLinkedMessageQueueAtomicSize - Updated to beable to get the size of messages on the 'head' queue along with additional debug
Systests
ReturnUnroutableMandatoryMessageTest - Updated as declareExchange is now Synchronous
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@515127 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src/main')
6 files changed, 247 insertions, 128 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 89f596e541..61143eee69 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -198,9 +198,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private final Object _suspensionLock = new Object(); - /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ + private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class); + private class Dispatcher extends Thread { @@ -212,12 +213,37 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public Dispatcher() { super("Dispatcher-Channel-" + _channelId); + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " created"); + } } public void run() { + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " started"); + } + UnprocessedMessage message; + // Allow disptacher to start stopped + synchronized (_lock) + { + while (connectionStopped()) + { + try + { + _lock.wait(); + } + catch (InterruptedException e) + { + // ignore + } + } + } + try { while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null) @@ -243,10 +269,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } catch (InterruptedException e) { - ; + //ignore + } + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId); } - - _logger.info("Dispatcher thread terminating for channel " + _channelId); } // only call while holding lock @@ -263,6 +291,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi currently = _connectionStopped; _connectionStopped = connectionStopped; _lock.notify(); + + if (_dispatcherLogger.isDebugEnabled()) + { + _dispatcherLogger.debug("Dispatcher Connection " + (connectionStopped ? "Started" : "Stopped") + + ": Currently " + (currently ? "Started" : "Stopped")); + } } return currently; } @@ -275,9 +309,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (consumer == null) { - _logger.warn("Received a message from queue " + message.getDeliverBody().consumerTag + " without a handler - ignoring..."); - _logger.warn("Consumers that exist: " + _consumers); - _logger.warn("Session hashcode: " + System.identityHashCode(this)); + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + + "[" + message.getDeliverBody().deliveryTag + "] from queue " + + message.getDeliverBody().consumerTag + " without a handler - rejecting(requeue)..."); + } + + rejectMessage(message, true); } else { @@ -311,7 +350,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi rejectAllMessages(true); - _logger.debug("Session Pre Dispatch Queue cleared"); + _dispatcherLogger.debug("Session Pre Dispatch Queue cleared"); for (BasicMessageConsumer consumer : _consumers.values()) { @@ -323,20 +362,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } - public void rejectPending(AMQShortString consumerTag) + public void rejectPending(BasicMessageConsumer consumer) { synchronized (_lock) { - boolean stopped = connectionStopped(); + boolean stopped = _dispatcher.connectionStopped(); - _dispatcher.setConnectionStopped(false); - - rejectMessagesForConsumerTag(consumerTag, true); - - if (stopped) + if (!stopped) { - _dispatcher.setConnectionStopped(stopped); + _dispatcher.setConnectionStopped(true); } + + // Reject messages on pre-receive queue + consumer.rollback(); + + // Reject messages on pre-dispatch queue + rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); + + // Remove consumer from map. + deregisterConsumer(consumer); + + _dispatcher.setConnectionStopped(stopped); + } } } @@ -549,14 +596,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi suspendChannel(true); } - _connection.getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); - if (_dispatcher != null) { _dispatcher.rollback(); } + _connection.getProtocolHandler().syncWrite( + TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); + + if (!isSuspended) { suspendChannel(false); @@ -663,14 +711,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi jmse = e; } } - finally + if (jmse != null) { - if (jmse != null) - { - throw jmse; - } + throw jmse; } - } @@ -835,6 +879,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi consumer.clearUnackedMessages(); } + if (_dispatcher != null) + { + _dispatcher.rollback(); + } + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. @@ -844,11 +893,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false) // requeue , BasicRecoverOkBody.class); - if (_dispatcher != null) - { - _dispatcher.rollback(); - } - if (!isSuspended) { suspendChannel(false); @@ -1223,35 +1267,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return (counter != null) && (counter.get() != 0); } - - public void declareExchange(AMQShortString name, AMQShortString type) throws AMQException + public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException { - declareExchange(name, type, getProtocolHandler()); + declareExchange(name, type, getProtocolHandler(), nowait); } - public void declareExchangeSynch(AMQShortString name, AMQShortString type) throws AMQException + private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - false, // nowait - false, // passive - getTicket(), // ticket - type); // type - getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class); + declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); } - private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException - { - declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler); - } - - private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) throws AMQException + private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException { // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, @@ -1261,7 +1287,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, // durable name, // exchange false, // internal - false, // nowait + nowait, // nowait false, // passive getTicket(), // ticket type); // type @@ -1874,15 +1900,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized void startDistpatcherIfNecessary() { + startDistpatcherIfNecessary(false); + } + + synchronized void startDistpatcherIfNecessary(boolean initiallyStopped) + { if (_dispatcher == null) { _dispatcher = new Dispatcher(); _dispatcher.setDaemon(true); + _dispatcher.setConnectionStopped(initiallyStopped); _dispatcher.start(); } else { - _dispatcher.setConnectionStopped(false); + _dispatcher.setConnectionStopped(initiallyStopped); } } @@ -1910,7 +1942,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQProtocolHandler protocolHandler = getProtocolHandler(); - declareExchange(amqd, protocolHandler); + declareExchange(amqd, protocolHandler, false); AMQShortString queueName = declareQueue(amqd, protocolHandler); @@ -1950,12 +1982,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _destinationConsumerCount.remove(dest); } } - - //ensure we remove the messages from the consumer even if the dispatcher hasn't started - if (_dispatcher == null) - { - rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); - }// if the dispatcher is running we have to do the clean up in the Ok Handler. } } @@ -2033,6 +2059,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void confirmConsumerCancelled(AMQShortString consumerTag) { + + // Remove the consumer from the map BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); if (consumer != null) { @@ -2040,26 +2068,33 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { consumer.closeWhenNoMessages(true); } + + //Clean the Maps up first + //Flush any pending messages for this consumerTag + if (_dispatcher != null) + { + _logger.info("Dispatcher is not null"); + } else { - consumer.rollback(); + _logger.info("Dispatcher is null so created stopped dispatcher"); + + startDistpatcherIfNecessary(true); } - } - //Flush any pending messages for this consumerTag - if (_dispatcher != null) - { - _dispatcher.rejectPending(consumerTag); + _dispatcher.rejectPending(consumer); } else { - rejectMessagesForConsumerTag(consumerTag, true); + _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map."); } + + } /* - * I could have combined the last 3 methods, but this way it improves readability - */ + * I could have combined the last 3 methods, but this way it improves readability + */ private AMQTopic checkValidTopic(Topic topic) throws JMSException { if (topic == null) @@ -2189,16 +2224,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (consumerTag == null || message.getDeliverBody().consumerTag.equals(consumerTag)) { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Removing message from _queue:" + message); + _logger.debug("Removing message(" + System.identityHashCode(message) + + ") from _queue DT:" + message.getDeliverBody().deliveryTag); } messages.remove(); - rejectMessage(message.getDeliverBody().deliveryTag, requeue); + rejectMessage(message, requeue); - _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); + } } else { @@ -2207,15 +2246,45 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + + public void rejectMessage(UnprocessedMessage message, boolean requeue) + { + + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag); + } + + rejectMessage(message.getDeliverBody().deliveryTag, requeue); + } + + public void rejectMessage(AbstractJMSMessage message, boolean requeue) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag()); + } + rejectMessage(message.getDeliveryTag(), requeue); + + } + public void rejectMessage(long deliveryTag, boolean requeue) { - AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - deliveryTag, - requeue); + if (_acknowledgeMode == CLIENT_ACKNOWLEDGE || + _acknowledgeMode == SESSION_TRANSACTED) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting delivery tag:" + deliveryTag); + } + AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), + getProtocolMinorVersion(), + deliveryTag, + requeue); - _connection.getProtocolHandler().writeFrame(basicRejectBody); + _connection.getProtocolHandler().writeFrame(basicRejectBody); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index e9b914425a..9043faa80c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; import java.util.Iterator; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -109,9 +110,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */ private int _outstanding; - /** Tag of last message delievered, whoch should be acknowledged on commit in transaction mode. */ - private long _lastDeliveryTag; - /** * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding * number of msgs >= _prefetchHigh and disabled at < _prefetchLow @@ -120,6 +118,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>(); + /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */ + private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>(); + /** * The thread that was used to call receive(). This is important for being able to interrupt that thread if a * receive() is in progress. @@ -432,6 +433,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void close(boolean sendClose) throws JMSException { + if (_logger.isInfoEnabled()) + { + _logger.info("Closing consumer:" + debugIdentity()); + } + synchronized (_connection.getFailoverMutex()) { if (!_closed.getAndSet(true)) @@ -448,6 +454,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class); + + if (_logger.isDebugEnabled()) + { + _logger.debug("CancelOk'd for consumer:" + debugIdentity()); + } + } catch (AMQException e) { @@ -456,11 +468,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - deregisterConsumer(); - _unacknowledgedDeliveryTags.clear(); + //done in BasicCancelOK Handler + //deregisterConsumer(); if (_messageListener != null && _receiving.get()) { - _logger.info("Interrupting thread: " + _receivingThread); + if (_logger.isInfoEnabled()) + { + _logger.info("Interrupting thread: " + _receivingThread); + } _receivingThread.interrupt(); } } @@ -616,7 +631,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } else { - _lastDeliveryTag = msg.getDeliveryTag(); + _receivedDeliveryTags.add(msg.getDeliveryTag()); } break; } @@ -625,10 +640,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** Acknowledge up to last message delivered (if any). Used when commiting. */ void acknowledgeLastDelivered() { - if (_lastDeliveryTag > 0) + if (!_receivedDeliveryTags.isEmpty()) { - _session.acknowledgeMessage(_lastDeliveryTag, true); - _lastDeliveryTag = -1; + long lastDeliveryTag = _receivedDeliveryTags.poll(); + + while (!_receivedDeliveryTags.isEmpty()) + { + lastDeliveryTag = _receivedDeliveryTags.poll(); + } + + _session.acknowledgeMessage(lastDeliveryTag, true); } } @@ -738,43 +759,76 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void rollback() { + clearUnackedMessages(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting received messages"); + } + //rollback received but not committed messages + while (!_receivedDeliveryTags.isEmpty()) + { + Long tag = _receivedDeliveryTags.poll(); + + if (tag != null) + { + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejecting tag from _receivedDTs:" + tag); + } + + _session.rejectMessage(tag, true); + } + } + + //rollback pending messages if (_synchronousQueue.size() > 0) { if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting the messages for consumer with tag:" + _consumerTag); + _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ")" + + "for consumer with tag:" + _consumerTag); } Iterator iterator = _synchronousQueue.iterator(); + while (iterator.hasNext()) { - Object o = iterator.next(); + Object o = iterator.next(); if (o instanceof AbstractJMSMessage) { - _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true); + _session.rejectMessage(((AbstractJMSMessage) o), true); if (_logger.isTraceEnabled()) { - _logger.trace("Rejected message" + o); - iterator.remove(); + _logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag()); } + iterator.remove(); } else { _logger.error("Queue contained a :" + o.getClass() + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + iterator.remove(); } } if (_synchronousQueue.size() != 0) { _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size()); + rollback(); } _synchronousQueue.clear(); } } + + public String debugIdentity() + { + return String.valueOf(_consumerTag); + } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java index 9bd0205977..bd8177feb6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java @@ -28,27 +28,29 @@ import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.protocol.AMQMethodEvent; -/** - * @author Apache Software Foundation - */ public class BasicCancelOkMethodHandler implements StateAwareMethodListener { - private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class); - private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler(); + private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class); + private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler(); + + public static BasicCancelOkMethodHandler getInstance() + { + return _instance; + } + + private BasicCancelOkMethodHandler() + { + } - public static BasicCancelOkMethodHandler getInstance() - { - return _instance; - } + public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + { + BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod(); - private BasicCancelOkMethodHandler() - { - } + if (_logger.isInfoEnabled()) + { + _logger.info("New BasicCancelOk method received for consumer:" + body.consumerTag); + } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException - { - _logger.debug("New BasicCancelOk method received"); - BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod(); - protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag); - } + protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index ddf79ec907..b176df87fe 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -30,13 +30,11 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; /** - * This class contains everything needed to process a JMS message. It assembles the - * deliver body, the content header and the content body/ies. - * - * Note that the actual work of creating a JMS message for the client code's use is done - * outside of the MINA dispatcher thread in order to minimise the amount of work done in - * the MINA dispatcher thread. + * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and + * the content body/ies. * + * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher + * thread in order to minimise the amount of work done in the MINA dispatcher thread. */ public class UnprocessedMessage { @@ -47,9 +45,7 @@ public class UnprocessedMessage private final int _channelId; private ContentHeaderBody _contentHeader; - /** - * List of ContentBody instances. Due to fragmentation you don't know how big this will be in general - */ + /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */ private List<ContentBody> _bodies; public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody) @@ -74,9 +70,9 @@ public class UnprocessedMessage { final long payloadSize = body.payload.remaining(); - if(_bodies == null) + if (_bodies == null) { - if(payloadSize == getContentHeader().bodySize) + if (payloadSize == getContentHeader().bodySize) { _bodies = Collections.singletonList(body); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java index b2940d73ae..8a0b5e7d84 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java @@ -58,6 +58,7 @@ public class StateWaiter implements StateListener { _logger.debug("State " + _state + " not achieved so waiting..."); _monitor.wait(TIME_OUT); + //fixme this won't cause the timeout to exit the loop. need to set _throwable } catch (InterruptedException e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index 03e7d399ce..cb4ef01d25 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -20,9 +20,6 @@ */ package org.apache.qpid.client.util; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.log4j.Logger; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; |
