diff options
Diffstat (limited to 'java/client/src/main')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 18 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 27 |
2 files changed, 41 insertions, 4 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 183865ac21..c25eb1f2c3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -136,7 +136,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private volatile AtomicBoolean _stopped = new AtomicBoolean(true); - + /** + * Set when recover is called. This is to handle the case where recover() is called by application code + * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called. + */ + private boolean _inRecovery; /** @@ -696,6 +700,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); checkNotTransacted(); // throws IllegalStateException if a transacted session + // this is set only here, and the before the consumer's onMessage is called it is set to false + _inRecovery = true; for (BasicMessageConsumer consumer : _consumers.values()) { consumer.clearUnackedMessages(); @@ -703,6 +709,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false)); } + boolean isInRecovery() + { + return _inRecovery; + } + + void setInRecovery(boolean inRecovery) + { + _inRecovery = inRecovery; + } + public void acknowledge() throws JMSException { if(isClosed()) diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 3a5de6f10c..d3d9db3806 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -136,6 +136,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>(); + /** + * The thread that was used to call receive(). This is important for being able to interrupt that thread if + * a receive() is in progress. + */ + private Thread _receivingThread; + protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, @@ -236,6 +242,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag()); } + _session.setInRecovery(false); } private void acquireReceiving() throws JMSException @@ -248,11 +255,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { throw new javax.jms.IllegalStateException("A listener has already been set."); } + _receivingThread = Thread.currentThread(); } private void releaseReceiving() { _receiving.set(false); + _receivingThread = null; } public FieldTable getRawSelectorFieldTable() @@ -318,7 +327,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (InterruptedException e) { - _logger.warn("Interrupted: " + e, e); + _logger.warn("Interrupted: " + e); return null; } finally @@ -399,6 +408,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer deregisterConsumer(); _unacknowledgedDeliveryTags.clear(); + if (_messageListener != null && _receiving.get()) + { + _logger.info("Interrupting thread: " + _receivingThread); + _receivingThread.interrupt(); + } } } } @@ -497,11 +511,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_dups_ok_acknowledge_send) { - _session.acknowledgeMessage(msg.getDeliveryTag(), true); + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), true); + } } break; case Session.AUTO_ACKNOWLEDGE: - _session.acknowledgeMessage(msg.getDeliveryTag(), false); + // we do not auto ack a message if the application code called recover() + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } break; case Session.SESSION_TRANSACTED: _lastDeliveryTag = msg.getDeliveryTag(); |
