summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-23 10:20:44 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-23 10:20:44 +0000
commitde248153d311b1e0211dfe3230afcb306f3c0192 (patch)
tree30412df8d5fd1d3ef076fba0903301b25f8a7518 /java/client/src/main
parentf74e4dc27d1655760d0213fd60cc75c272c26f00 (diff)
downloadqpid-python-de248153d311b1e0211dfe3230afcb306f3c0192.tar.gz
QPID-346 Message loss after rollback
QPID-348 Problems of prefetching messages QPID-355 Closing a consumer does not ensure messages delivery will stop for that subscription BROKER AMQChannel - updated requeue to either resend via the Delivery Manager not directly via msg.writedeliver. BasicRejectMethodHandler - initial place holder. TxRollbackHandler - Added comment AMQMessage - added ability to record who has taken the message so that it can be resent to that subscriber on resend/requeue. AMQQueue - added the queue reference to the Subscription creation ConcurrentSelectorDeliveryManager - Added methods to correctly monitor the size of queue messages. Including messages on the resend queue of a Subscriber. Additional locking to ensure that messages are not sent to the subscriber after Closure. QPID-355 DeliveryManager - adjusted deliver call to allow delivery to the head of the queue. Subscription - changes to allow selction of queue(resend or predelivery) methods to add to resend and getSendLock to ensure that sending to the Subscription is allowed. SubscriptionFactory - changes to allow the AMQQueue to be passed to the Subscription. SubscriptionImpl - implementation of the interfaces. Local storage of messages to be resent and requeuing of the messages during closure. SubscriptionSet - changes to retrieve the actual stored Subscription when performing removeSubscriber. So we have access to the the resend queue. AMQStateManager - Added BasicRejectMethodHandler TransactionalContext - Added option to deliver the messages to the front of the queue. LocalTransactionalContext - cleared the _postComitDeliveryList on rollback. Added option to deliver the messages to the front of the queue. NonTransactionalContext - Added option to deliver the messages to the front of the queue. DeliverMessageOperation.java DELELTED AS NOT USED. CLIENT AMQSession - added ability to get the pervious state of the dispatcher when settting Stopped, fixed the channel suspension problems on broker so uncommented clean up code in rollback and recover. BasicMessageConsumer - updated the rollback so that it sends reject messages to server. AbstractJMSMessage - whitespace + added extra message properties to the toString() AMQProtocolHandler - whitespace + extra debug output TransactedTest - updated expect to prevent NPEs also added extra logging to help understand what is going on. CLUSTER ClusteredQueue - AMQQueue changes for message deliveryFirst. RemoteSubscriptionImpl - Implementation of Subscription SYSTESTS AbstractHeadersExchangeTestBase - AMQQueue changes for message deliveryFirst. AMQQueueMBeanTest - changes for message deliveryFirst. ConcurrencyTest - changes for message deliveryFirst. DeliveryManagerTest - changes for message deliveryFirst. SubscriptionTestHelper - Implementation of Subscription WhiteSpace only UnacknowledgedMessageMapImpl.java git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@510897 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java44
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java29
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java74
4 files changed, 78 insertions, 81 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index ad600ddb40..89f596e541 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -255,13 +255,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
return _connectionStopped;
}
- void setConnectionStopped(boolean connectionStopped)
+ boolean setConnectionStopped(boolean connectionStopped)
{
+ boolean currently;
synchronized (_lock)
{
+ currently = _connectionStopped;
_connectionStopped = connectionStopped;
_lock.notify();
}
+ return currently;
}
private void dispatchMessage(UnprocessedMessage message)
@@ -543,7 +546,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (!isSuspended)
{
-// suspendChannel(true);
+ suspendChannel(true);
}
_connection.getProtocolHandler().syncWrite(
@@ -556,7 +559,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (!isSuspended)
{
-// suspendChannel(false);
+ suspendChannel(false);
}
}
catch (AMQException e)
@@ -822,10 +825,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
boolean isSuspended = isSuspended();
-// if (!isSuspended)
-// {
-// suspendChannel(true);
-// }
+ if (!isSuspended)
+ {
+ suspendChannel(true);
+ }
for (BasicMessageConsumer consumer : _consumers.values())
{
@@ -841,15 +844,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false) // requeue
, BasicRecoverOkBody.class);
-// if (_dispatcher != null)
-// {
-// _dispatcher.rollback();
-// }
-//
-// if (!isSuspended)
-// {
-// suspendChannel(false);
-// }
+ if (_dispatcher != null)
+ {
+ _dispatcher.rollback();
+ }
+
+ if (!isSuspended)
+ {
+ suspendChannel(false);
+ }
}
catch (AMQException e)
{
@@ -1952,7 +1955,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (_dispatcher == null)
{
rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
- }
+ }// if the dispatcher is running we have to do the clean up in the Ok Handler.
}
}
@@ -2171,8 +2174,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
rejectMessagesForConsumerTag(null, requeue);
}
- /** @param consumerTag The consumerTag to prune from queue or all if null
- * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ)
+ /**
+ * @param consumerTag The consumerTag to prune from queue or all if null
+ * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ)
*/
private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue)
@@ -2192,7 +2196,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
messages.remove();
-// rejectMessage(message.getDeliverBody().deliveryTag, requeue);
+ rejectMessage(message.getDeliverBody().deliveryTag, requeue);
_logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 496e377435..e9b914425a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -745,28 +745,32 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_logger.debug("Rejecting the messages for consumer with tag:" + _consumerTag);
}
- for (Object o : _synchronousQueue)
+ Iterator iterator = _synchronousQueue.iterator();
+ while (iterator.hasNext())
{
+ Object o = iterator.next();
+
if (o instanceof AbstractJMSMessage)
{
-// _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true);
+ _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true);
if (_logger.isTraceEnabled())
{
_logger.trace("Rejected message" + o);
+ iterator.remove();
}
}
else
{
_logger.error("Queue contained a :" + o.getClass() +
- " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+ " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
}
}
if (_synchronousQueue.size() != 0)
{
- _logger.warn("Queue was not empty after rejecting all messages");
+ _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size());
}
_synchronousQueue.clear();
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 67d74055c6..36dd4d400c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -87,17 +87,17 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
switch (contentType)
{
- case AMQDestination.QUEUE_TYPE:
- dest = new AMQQueue(exchange, routingKey, routingKey);
- break;
+ case AMQDestination.QUEUE_TYPE:
+ dest = new AMQQueue(exchange, routingKey, routingKey);
+ break;
- case AMQDestination.TOPIC_TYPE:
- dest = new AMQTopic(exchange, routingKey, null);
- break;
+ case AMQDestination.TOPIC_TYPE:
+ dest = new AMQTopic(exchange, routingKey, null);
+ break;
- default:
- dest = new AMQUndefinedDestination(exchange, routingKey, null);
- break;
+ default:
+ dest = new AMQUndefinedDestination(exchange, routingKey, null);
+ break;
}
//Destination dest = AMQDestination.createDestination(url);
setJMSDestination(dest);
@@ -203,7 +203,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
if (!(destination instanceof AMQDestination))
{
throw new IllegalArgumentException(
- "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
+ "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
}
final AMQDestination amqd = (AMQDestination) destination;
@@ -495,8 +495,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public abstract void clearBodyImpl() throws JMSException;
/**
- * Get a String representation of the body of the message. Used in the
- * toString() method which outputs this before message properties.
+ * Get a String representation of the body of the message. Used in the toString() method which outputs this before
+ * message properties.
*/
public abstract String toBodyString() throws JMSException;
@@ -519,7 +519,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
buf.append("\nJMS priority: ").append(getJMSPriority());
buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode());
buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo()));
+ buf.append("\nJMS Redelivered: ").append(_redelivered);
+ buf.append("\nJMS Destination: ").append(getJMSDestination());
+ buf.append("\nJMS Type: ").append(getJMSType());
+ buf.append("\nJMS MessageID: ").append(getJMSMessageID());
buf.append("\nAMQ message number: ").append(_deliveryTag);
+
buf.append("\nProperties:");
if (getJmsHeaders().isEmpty())
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 988a12ee78..d0cc52271a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -65,15 +65,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter
private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
/**
- * The connection that this protocol handler is associated with. There is a 1-1
- * mapping between connection instances and protocol handler instances.
+ * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances
+ * and protocol handler instances.
*/
private AMQConnection _connection;
- /**
- * Our wrapper for a protocol session that provides access to session values
- * in a typesafe manner.
- */
+ /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */
private volatile AMQProtocolSession _protocolSession;
private AMQStateManager _stateManager = new AMQStateManager();
@@ -120,8 +117,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
// we only add the SSL filter where we have an SSL connection
if (_connection.getSSLConfiguration() != null)
{
- SSLConfiguration sslConfig = _connection.getSSLConfiguration();
- SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
+ SSLConfiguration sslConfig = _connection.getSSLConfiguration();
+ SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext());
sslFilter.setUseClientMode(true);
session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
@@ -139,7 +136,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
e.printStackTrace();
}
-
+
_protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
_protocolSession.init();
}
@@ -154,6 +151,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* sessionClosed() depending on whether we were trying to send data at the time of failure.
*
* @param session
+ *
* @throws Exception
*/
public void sessionClosed(IoSession session) throws Exception
@@ -208,9 +206,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_logger.info("Protocol Session [" + this + "] closed");
}
- /**
- * See {@link FailoverHandler} to see rationale for separate thread.
- */
+ /** See {@link FailoverHandler} to see rationale for separate thread. */
private void startFailoverThread()
{
Thread failoverThread = new Thread(_failoverHandler);
@@ -267,10 +263,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
/**
- * There are two cases where we have other threads potentially blocking for events to be handled by this
- * class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a
- * particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can
- * react appropriately.
+ * There are two cases where we have other threads potentially blocking for events to be handled by this class.
+ * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type
+ * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately.
*
* @param e the exception to propagate
*/
@@ -306,13 +301,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
- switch(bodyFrame.getFrameType())
+ switch (bodyFrame.getFrameType())
{
case AMQMethodBody.TYPE:
if (debug)
{
- _logger.debug("Method frame received: " + frame);
+ _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
}
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
@@ -362,10 +357,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_protocolSession.messageContentBodyReceived(frame.getChannel(),
(ContentBody) bodyFrame);
break;
-
+
case HeartbeatBody.TYPE:
- if(debug)
+ if (debug)
{
_logger.debug("Received heartbeat");
}
@@ -413,8 +408,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
/**
- * Convenience method that writes a frame to the protocol session. Equivalent
- * to calling getProtocolSession().write().
+ * Convenience method that writes a frame to the protocol session. Equivalent to calling
+ * getProtocolSession().write().
*
* @param frame the frame to write
*/
@@ -429,30 +424,28 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
/**
- * Convenience method that writes a frame to the protocol session and waits for
- * a particular response. Equivalent to calling getProtocolSession().write() then
- * waiting for the response.
+ * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
+ * calling getProtocolSession().write() then waiting for the response.
*
* @param frame
* @param listener the blocking listener. Note the calling thread will block.
*/
public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
- BlockingMethodFrameListener listener)
+ BlockingMethodFrameListener listener)
throws AMQException
{
return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT);
}
/**
- * Convenience method that writes a frame to the protocol session and waits for
- * a particular response. Equivalent to calling getProtocolSession().write() then
- * waiting for the response.
+ * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
+ * calling getProtocolSession().write() then waiting for the response.
*
* @param frame
* @param listener the blocking listener. Note the calling thread will block.
*/
public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
- BlockingMethodFrameListener listener, long timeout)
+ BlockingMethodFrameListener listener, long timeout)
throws AMQException
{
try
@@ -477,17 +470,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
- /**
- * More convenient method to write a frame and wait for it's response.
- */
+ /** More convenient method to write a frame and wait for it's response. */
public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException
{
return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT);
}
- /**
- * More convenient method to write a frame and wait for it's response.
- */
+ /** More convenient method to write a frame and wait for it's response. */
public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException
{
return writeCommandFrameAndWaitForReply(frame,
@@ -495,9 +484,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
/**
- * Convenience method to register an AMQSession with the protocol handler. Registering
- * a session with the protocol handler will ensure that messages are delivered to the
- * consumer(s) on that session.
+ * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol
+ * handler will ensure that messages are delivered to the consumer(s) on that session.
*
* @param channelId the channel id of the session
* @param session the session instance.
@@ -555,17 +543,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
- /**
- * @return the number of bytes read from this protocol session
- */
+ /** @return the number of bytes read from this protocol session */
public long getReadBytes()
{
return _protocolSession.getIoSession().getReadBytes();
}
- /**
- * @return the number of bytes written to this protocol session
- */
+ /** @return the number of bytes written to this protocol session */
public long getWrittenBytes()
{
return _protocolSession.getIoSession().getWrittenBytes();