diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-02-23 10:20:44 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-23 10:20:44 +0000 |
| commit | de248153d311b1e0211dfe3230afcb306f3c0192 (patch) | |
| tree | 30412df8d5fd1d3ef076fba0903301b25f8a7518 /java/client/src/main | |
| parent | f74e4dc27d1655760d0213fd60cc75c272c26f00 (diff) | |
| download | qpid-python-de248153d311b1e0211dfe3230afcb306f3c0192.tar.gz | |
QPID-346 Message loss after rollback
QPID-348 Problems of prefetching messages
QPID-355 Closing a consumer does not ensure messages delivery will stop for that subscription
BROKER
AMQChannel - updated requeue to either resend via the Delivery Manager not directly via msg.writedeliver.
BasicRejectMethodHandler - initial place holder.
TxRollbackHandler - Added comment
AMQMessage - added ability to record who has taken the message so that it can be resent to that subscriber on resend/requeue.
AMQQueue - added the queue reference to the Subscription creation
ConcurrentSelectorDeliveryManager - Added methods to correctly monitor the size of queue messages. Including messages on the resend queue of a Subscriber. Additional locking to ensure that messages are not sent to the subscriber after Closure. QPID-355
DeliveryManager - adjusted deliver call to allow delivery to the head of the queue.
Subscription - changes to allow selction of queue(resend or predelivery) methods to add to resend and getSendLock to ensure that sending to the Subscription is allowed.
SubscriptionFactory - changes to allow the AMQQueue to be passed to the Subscription.
SubscriptionImpl - implementation of the interfaces. Local storage of messages to be resent and requeuing of the messages during closure.
SubscriptionSet - changes to retrieve the actual stored Subscription when performing removeSubscriber. So we have access to the the resend queue.
AMQStateManager - Added BasicRejectMethodHandler
TransactionalContext - Added option to deliver the messages to the front of the queue.
LocalTransactionalContext - cleared the _postComitDeliveryList on rollback. Added option to deliver the messages to the front of the queue.
NonTransactionalContext - Added option to deliver the messages to the front of the queue.
DeliverMessageOperation.java DELELTED AS NOT USED.
CLIENT
AMQSession - added ability to get the pervious state of the dispatcher when settting Stopped, fixed the channel suspension problems on broker so uncommented clean up code in rollback and recover.
BasicMessageConsumer - updated the rollback so that it sends reject messages to server.
AbstractJMSMessage - whitespace + added extra message properties to the toString()
AMQProtocolHandler - whitespace + extra debug output
TransactedTest - updated expect to prevent NPEs also added extra logging to help understand what is going on.
CLUSTER
ClusteredQueue - AMQQueue changes for message deliveryFirst.
RemoteSubscriptionImpl - Implementation of Subscription
SYSTESTS
AbstractHeadersExchangeTestBase - AMQQueue changes for message deliveryFirst.
AMQQueueMBeanTest - changes for message deliveryFirst.
ConcurrencyTest - changes for message deliveryFirst.
DeliveryManagerTest - changes for message deliveryFirst.
SubscriptionTestHelper - Implementation of Subscription
WhiteSpace only
UnacknowledgedMessageMapImpl.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@510897 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
4 files changed, 78 insertions, 81 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index ad600ddb40..89f596e541 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -255,13 +255,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return _connectionStopped; } - void setConnectionStopped(boolean connectionStopped) + boolean setConnectionStopped(boolean connectionStopped) { + boolean currently; synchronized (_lock) { + currently = _connectionStopped; _connectionStopped = connectionStopped; _lock.notify(); } + return currently; } private void dispatchMessage(UnprocessedMessage message) @@ -543,7 +546,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (!isSuspended) { -// suspendChannel(true); + suspendChannel(true); } _connection.getProtocolHandler().syncWrite( @@ -556,7 +559,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (!isSuspended) { -// suspendChannel(false); + suspendChannel(false); } } catch (AMQException e) @@ -822,10 +825,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean isSuspended = isSuspended(); -// if (!isSuspended) -// { -// suspendChannel(true); -// } + if (!isSuspended) + { + suspendChannel(true); + } for (BasicMessageConsumer consumer : _consumers.values()) { @@ -841,15 +844,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false) // requeue , BasicRecoverOkBody.class); -// if (_dispatcher != null) -// { -// _dispatcher.rollback(); -// } -// -// if (!isSuspended) -// { -// suspendChannel(false); -// } + if (_dispatcher != null) + { + _dispatcher.rollback(); + } + + if (!isSuspended) + { + suspendChannel(false); + } } catch (AMQException e) { @@ -1952,7 +1955,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_dispatcher == null) { rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); - } + }// if the dispatcher is running we have to do the clean up in the Ok Handler. } } @@ -2171,8 +2174,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi rejectMessagesForConsumerTag(null, requeue); } - /** @param consumerTag The consumerTag to prune from queue or all if null - * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) + /** + * @param consumerTag The consumerTag to prune from queue or all if null + * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) */ private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue) @@ -2192,7 +2196,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi messages.remove(); -// rejectMessage(message.getDeliverBody().deliveryTag, requeue); + rejectMessage(message.getDeliverBody().deliveryTag, requeue); _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 496e377435..e9b914425a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -745,28 +745,32 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _logger.debug("Rejecting the messages for consumer with tag:" + _consumerTag); } - for (Object o : _synchronousQueue) + Iterator iterator = _synchronousQueue.iterator(); + while (iterator.hasNext()) { + Object o = iterator.next(); + if (o instanceof AbstractJMSMessage) { -// _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true); + _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true); if (_logger.isTraceEnabled()) { _logger.trace("Rejected message" + o); + iterator.remove(); } } else { _logger.error("Queue contained a :" + o.getClass() + - " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); } } if (_synchronousQueue.size() != 0) { - _logger.warn("Queue was not empty after rejecting all messages"); + _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size()); } _synchronousQueue.clear(); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 67d74055c6..36dd4d400c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -87,17 +87,17 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach switch (contentType) { - case AMQDestination.QUEUE_TYPE: - dest = new AMQQueue(exchange, routingKey, routingKey); - break; + case AMQDestination.QUEUE_TYPE: + dest = new AMQQueue(exchange, routingKey, routingKey); + break; - case AMQDestination.TOPIC_TYPE: - dest = new AMQTopic(exchange, routingKey, null); - break; + case AMQDestination.TOPIC_TYPE: + dest = new AMQTopic(exchange, routingKey, null); + break; - default: - dest = new AMQUndefinedDestination(exchange, routingKey, null); - break; + default: + dest = new AMQUndefinedDestination(exchange, routingKey, null); + break; } //Destination dest = AMQDestination.createDestination(url); setJMSDestination(dest); @@ -203,7 +203,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach if (!(destination instanceof AMQDestination)) { throw new IllegalArgumentException( - "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); + "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); } final AMQDestination amqd = (AMQDestination) destination; @@ -495,8 +495,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public abstract void clearBodyImpl() throws JMSException; /** - * Get a String representation of the body of the message. Used in the - * toString() method which outputs this before message properties. + * Get a String representation of the body of the message. Used in the toString() method which outputs this before + * message properties. */ public abstract String toBodyString() throws JMSException; @@ -519,7 +519,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach buf.append("\nJMS priority: ").append(getJMSPriority()); buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode()); buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo())); + buf.append("\nJMS Redelivered: ").append(_redelivered); + buf.append("\nJMS Destination: ").append(getJMSDestination()); + buf.append("\nJMS Type: ").append(getJMSType()); + buf.append("\nJMS MessageID: ").append(getJMSMessageID()); buf.append("\nAMQ message number: ").append(_deliveryTag); + buf.append("\nProperties:"); if (getJmsHeaders().isEmpty()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 988a12ee78..d0cc52271a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -65,15 +65,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class); /** - * The connection that this protocol handler is associated with. There is a 1-1 - * mapping between connection instances and protocol handler instances. + * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances + * and protocol handler instances. */ private AMQConnection _connection; - /** - * Our wrapper for a protocol session that provides access to session values - * in a typesafe manner. - */ + /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */ private volatile AMQProtocolSession _protocolSession; private AMQStateManager _stateManager = new AMQStateManager(); @@ -120,8 +117,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter // we only add the SSL filter where we have an SSL connection if (_connection.getSSLConfiguration() != null) { - SSLConfiguration sslConfig = _connection.getSSLConfiguration(); - SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); + SSLConfiguration sslConfig = _connection.getSSLConfiguration(); + SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext()); sslFilter.setUseClientMode(true); session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); @@ -139,7 +136,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter { e.printStackTrace(); } - + _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); _protocolSession.init(); } @@ -154,6 +151,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter * sessionClosed() depending on whether we were trying to send data at the time of failure. * * @param session + * * @throws Exception */ public void sessionClosed(IoSession session) throws Exception @@ -208,9 +206,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter _logger.info("Protocol Session [" + this + "] closed"); } - /** - * See {@link FailoverHandler} to see rationale for separate thread. - */ + /** See {@link FailoverHandler} to see rationale for separate thread. */ private void startFailoverThread() { Thread failoverThread = new Thread(_failoverHandler); @@ -267,10 +263,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** - * There are two cases where we have other threads potentially blocking for events to be handled by this - * class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a - * particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can - * react appropriately. + * There are two cases where we have other threads potentially blocking for events to be handled by this class. + * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type + * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately. * * @param e the exception to propagate */ @@ -306,13 +301,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - switch(bodyFrame.getFrameType()) + switch (bodyFrame.getFrameType()) { case AMQMethodBody.TYPE: if (debug) { - _logger.debug("Method frame received: " + frame); + _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); } final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame); @@ -362,10 +357,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame); break; - + case HeartbeatBody.TYPE: - if(debug) + if (debug) { _logger.debug("Received heartbeat"); } @@ -413,8 +408,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** - * Convenience method that writes a frame to the protocol session. Equivalent - * to calling getProtocolSession().write(). + * Convenience method that writes a frame to the protocol session. Equivalent to calling + * getProtocolSession().write(). * * @param frame the frame to write */ @@ -429,30 +424,28 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** - * Convenience method that writes a frame to the protocol session and waits for - * a particular response. Equivalent to calling getProtocolSession().write() then - * waiting for the response. + * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to + * calling getProtocolSession().write() then waiting for the response. * * @param frame * @param listener the blocking listener. Note the calling thread will block. */ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, - BlockingMethodFrameListener listener) + BlockingMethodFrameListener listener) throws AMQException { return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT); } /** - * Convenience method that writes a frame to the protocol session and waits for - * a particular response. Equivalent to calling getProtocolSession().write() then - * waiting for the response. + * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to + * calling getProtocolSession().write() then waiting for the response. * * @param frame * @param listener the blocking listener. Note the calling thread will block. */ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, - BlockingMethodFrameListener listener, long timeout) + BlockingMethodFrameListener listener, long timeout) throws AMQException { try @@ -477,17 +470,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter } - /** - * More convenient method to write a frame and wait for it's response. - */ + /** More convenient method to write a frame and wait for it's response. */ public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException { return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT); } - /** - * More convenient method to write a frame and wait for it's response. - */ + /** More convenient method to write a frame and wait for it's response. */ public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException { return writeCommandFrameAndWaitForReply(frame, @@ -495,9 +484,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** - * Convenience method to register an AMQSession with the protocol handler. Registering - * a session with the protocol handler will ensure that messages are delivered to the - * consumer(s) on that session. + * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol + * handler will ensure that messages are delivered to the consumer(s) on that session. * * @param channelId the channel id of the session * @param session the session instance. @@ -555,17 +543,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter } - /** - * @return the number of bytes read from this protocol session - */ + /** @return the number of bytes read from this protocol session */ public long getReadBytes() { return _protocolSession.getIoSession().getReadBytes(); } - /** - * @return the number of bytes written to this protocol session - */ + /** @return the number of bytes written to this protocol session */ public long getWrittenBytes() { return _protocolSession.getIoSession().getWrittenBytes(); |
