diff options
Diffstat (limited to 'java/client/src/main')
4 files changed, 78 insertions, 81 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index ad600ddb40..89f596e541 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -255,13 +255,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return _connectionStopped; } - void setConnectionStopped(boolean connectionStopped) + boolean setConnectionStopped(boolean connectionStopped) { + boolean currently; synchronized (_lock) { + currently = _connectionStopped; _connectionStopped = connectionStopped; _lock.notify(); } + return currently; } private void dispatchMessage(UnprocessedMessage message) @@ -543,7 +546,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (!isSuspended) { -// suspendChannel(true); + suspendChannel(true); } _connection.getProtocolHandler().syncWrite( @@ -556,7 +559,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (!isSuspended) { -// suspendChannel(false); + suspendChannel(false); } } catch (AMQException e) @@ -822,10 +825,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean isSuspended = isSuspended(); -// if (!isSuspended) -// { -// suspendChannel(true); -// } + if (!isSuspended) + { + suspendChannel(true); + } for (BasicMessageConsumer consumer : _consumers.values()) { @@ -841,15 +844,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false) // requeue , BasicRecoverOkBody.class); -// if (_dispatcher != null) -// { -// _dispatcher.rollback(); -// } -// -// if (!isSuspended) -// { -// suspendChannel(false); -// } + if (_dispatcher != null) + { + _dispatcher.rollback(); + } + + if (!isSuspended) + { + suspendChannel(false); + } } catch (AMQException e) { @@ -1952,7 +1955,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_dispatcher == null) { rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); - } + }// if the dispatcher is running we have to do the clean up in the Ok Handler. } } @@ -2171,8 +2174,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi rejectMessagesForConsumerTag(null, requeue); } - /** @param consumerTag The consumerTag to prune from queue or all if null - * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) + /** + * @param consumerTag The consumerTag to prune from queue or all if null + * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) */ private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue) @@ -2192,7 +2196,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi messages.remove(); -// rejectMessage(message.getDeliverBody().deliveryTag, requeue); + rejectMessage(message.getDeliverBody().deliveryTag, requeue); _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 496e377435..e9b914425a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -745,28 +745,32 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _logger.debug("Rejecting the messages for consumer with tag:" + _consumerTag); } - for (Object o : _synchronousQueue) + Iterator iterator = _synchronousQueue.iterator(); + while (iterator.hasNext()) { + Object o = iterator.next(); + if (o instanceof AbstractJMSMessage) { -// _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true); + _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true); if (_logger.isTraceEnabled()) { _logger.trace("Rejected message" + o); + iterator.remove(); } } else { _logger.error("Queue contained a :" + o.getClass() + - " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); } } if (_synchronousQueue.size() != 0) { - _logger.warn("Queue was not empty after rejecting all messages"); + _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size()); } _synchronousQueue.clear(); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 67d74055c6..36dd4d400c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -87,17 +87,17 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach switch (contentType) { - case AMQDestination.QUEUE_TYPE: - dest = new AMQQueue(exchange, routingKey, routingKey); - break; + case AMQDestination.QUEUE_TYPE: + dest = new AMQQueue(exchange, routingKey, routingKey); + break; - case AMQDestination.TOPIC_TYPE: - dest = new AMQTopic(exchange, routingKey, null); - break; + case AMQDestination.TOPIC_TYPE: + dest = new AMQTopic(exchange, routingKey, null); + break; - default: - dest = new AMQUndefinedDestination(exchange, routingKey, null); - break; + default: + dest = new AMQUndefinedDestination(exchange, routingKey, null); + break; } //Destination dest = AMQDestination.createDestination(url); setJMSDestination(dest); @@ -203,7 +203,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach if (!(destination instanceof AMQDestination)) { throw new IllegalArgumentException( - "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); + "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); } final AMQDestination amqd = (AMQDestination) destination; @@ -495,8 +495,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public abstract void clearBodyImpl() throws JMSException; /** - * Get a String representation of the body of the message. Used in the - * toString() method which outputs this before message properties. + * Get a String representation of the body of the message. Used in the toString() method which outputs this before + * message properties. */ public abstract String toBodyString() throws JMSException; @@ -519,7 +519,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach buf.append("\nJMS priority: ").append(getJMSPriority()); buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode()); buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo())); + buf.append("\nJMS Redelivered: ").append(_redelivered); + buf.append("\nJMS Destination: ").append(getJMSDestination()); + buf.append("\nJMS Type: ").append(getJMSType()); + buf.append("\nJMS MessageID: ").append(getJMSMessageID()); buf.append("\nAMQ message number: ").append(_deliveryTag); + buf.append("\nProperties:"); if (getJmsHeaders().isEmpty()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 988a12ee78..d0cc52271a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -65,15 +65,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class); /** - * The connection that this protocol handler is associated with. There is a 1-1 - * mapping between connection instances and protocol handler instances. + * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances + * and protocol handler instances. */ private AMQConnection _connection; - /** - * Our wrapper for a protocol session that provides access to session values - * in a typesafe manner. - */ + /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */ private volatile AMQProtocolSession _protocolSession; private AMQStateManager _stateManager = new AMQStateManager(); @@ -120,8 +117,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter // we only add the SSL filter where we have an SSL connection if (_connection.getSSLConfiguration() != null) { - SSLConfiguration sslConfig = _connection.getSSLConfiguration(); - SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); + SSLConfiguration sslConfig = _connection.getSSLConfiguration(); + SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext()); sslFilter.setUseClientMode(true); session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); @@ -139,7 +136,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter { e.printStackTrace(); } - + _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); _protocolSession.init(); } @@ -154,6 +151,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter * sessionClosed() depending on whether we were trying to send data at the time of failure. * * @param session + * * @throws Exception */ public void sessionClosed(IoSession session) throws Exception @@ -208,9 +206,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter _logger.info("Protocol Session [" + this + "] closed"); } - /** - * See {@link FailoverHandler} to see rationale for separate thread. - */ + /** See {@link FailoverHandler} to see rationale for separate thread. */ private void startFailoverThread() { Thread failoverThread = new Thread(_failoverHandler); @@ -267,10 +263,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** - * There are two cases where we have other threads potentially blocking for events to be handled by this - * class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a - * particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can - * react appropriately. + * There are two cases where we have other threads potentially blocking for events to be handled by this class. + * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type + * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately. * * @param e the exception to propagate */ @@ -306,13 +301,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - switch(bodyFrame.getFrameType()) + switch (bodyFrame.getFrameType()) { case AMQMethodBody.TYPE: if (debug) { - _logger.debug("Method frame received: " + frame); + _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); } final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame); @@ -362,10 +357,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame); break; - + case HeartbeatBody.TYPE: - if(debug) + if (debug) { _logger.debug("Received heartbeat"); } @@ -413,8 +408,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** - * Convenience method that writes a frame to the protocol session. Equivalent - * to calling getProtocolSession().write(). + * Convenience method that writes a frame to the protocol session. Equivalent to calling + * getProtocolSession().write(). * * @param frame the frame to write */ @@ -429,30 +424,28 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** - * Convenience method that writes a frame to the protocol session and waits for - * a particular response. Equivalent to calling getProtocolSession().write() then - * waiting for the response. + * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to + * calling getProtocolSession().write() then waiting for the response. * * @param frame * @param listener the blocking listener. Note the calling thread will block. */ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, - BlockingMethodFrameListener listener) + BlockingMethodFrameListener listener) throws AMQException { return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT); } /** - * Convenience method that writes a frame to the protocol session and waits for - * a particular response. Equivalent to calling getProtocolSession().write() then - * waiting for the response. + * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to + * calling getProtocolSession().write() then waiting for the response. * * @param frame * @param listener the blocking listener. Note the calling thread will block. */ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, - BlockingMethodFrameListener listener, long timeout) + BlockingMethodFrameListener listener, long timeout) throws AMQException { try @@ -477,17 +470,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter } - /** - * More convenient method to write a frame and wait for it's response. - */ + /** More convenient method to write a frame and wait for it's response. */ public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException { return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT); } - /** - * More convenient method to write a frame and wait for it's response. - */ + /** More convenient method to write a frame and wait for it's response. */ public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException { return writeCommandFrameAndWaitForReply(frame, @@ -495,9 +484,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** - * Convenience method to register an AMQSession with the protocol handler. Registering - * a session with the protocol handler will ensure that messages are delivered to the - * consumer(s) on that session. + * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol + * handler will ensure that messages are delivered to the consumer(s) on that session. * * @param channelId the channel id of the session * @param session the session instance. @@ -555,17 +543,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter } - /** - * @return the number of bytes read from this protocol session - */ + /** @return the number of bytes read from this protocol session */ public long getReadBytes() { return _protocolSession.getIoSession().getReadBytes(); } - /** - * @return the number of bytes written to this protocol session - */ + /** @return the number of bytes written to this protocol session */ public long getWrittenBytes() { return _protocolSession.getIoSession().getWrittenBytes(); |
