diff options
| author | Robert Greig <rgreig@apache.org> | 2006-12-18 18:05:25 +0000 |
|---|---|---|
| committer | Robert Greig <rgreig@apache.org> | 2006-12-18 18:05:25 +0000 |
| commit | d07480d43ef288fc231e8b6d41c4650af2307d22 (patch) | |
| tree | a6e4d9de842ef6d5a49a8599ac3c3879b4126cc6 /qpid/java/client/src/main | |
| parent | dca40905e1cf833f966ce663798563be82490011 (diff) | |
| download | qpid-python-d07480d43ef288fc231e8b6d41c4650af2307d22.tar.gz | |
QPID-212 QPID-214 Patch supplied by Rob Godfrey
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@488377 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src/main')
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 18 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 27 |
2 files changed, 41 insertions, 4 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 183865ac21..c25eb1f2c3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -136,7 +136,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private volatile AtomicBoolean _stopped = new AtomicBoolean(true); - + /** + * Set when recover is called. This is to handle the case where recover() is called by application code + * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called. + */ + private boolean _inRecovery; /** @@ -696,6 +700,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); checkNotTransacted(); // throws IllegalStateException if a transacted session + // this is set only here, and the before the consumer's onMessage is called it is set to false + _inRecovery = true; for (BasicMessageConsumer consumer : _consumers.values()) { consumer.clearUnackedMessages(); @@ -703,6 +709,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false)); } + boolean isInRecovery() + { + return _inRecovery; + } + + void setInRecovery(boolean inRecovery) + { + _inRecovery = inRecovery; + } + public void acknowledge() throws JMSException { if(isClosed()) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 3a5de6f10c..d3d9db3806 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -136,6 +136,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>(); + /** + * The thread that was used to call receive(). This is important for being able to interrupt that thread if + * a receive() is in progress. + */ + private Thread _receivingThread; + protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, @@ -236,6 +242,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag()); } + _session.setInRecovery(false); } private void acquireReceiving() throws JMSException @@ -248,11 +255,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { throw new javax.jms.IllegalStateException("A listener has already been set."); } + _receivingThread = Thread.currentThread(); } private void releaseReceiving() { _receiving.set(false); + _receivingThread = null; } public FieldTable getRawSelectorFieldTable() @@ -318,7 +327,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (InterruptedException e) { - _logger.warn("Interrupted: " + e, e); + _logger.warn("Interrupted: " + e); return null; } finally @@ -399,6 +408,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer deregisterConsumer(); _unacknowledgedDeliveryTags.clear(); + if (_messageListener != null && _receiving.get()) + { + _logger.info("Interrupting thread: " + _receivingThread); + _receivingThread.interrupt(); + } } } } @@ -497,11 +511,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_dups_ok_acknowledge_send) { - _session.acknowledgeMessage(msg.getDeliveryTag(), true); + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), true); + } } break; case Session.AUTO_ACKNOWLEDGE: - _session.acknowledgeMessage(msg.getDeliveryTag(), false); + // we do not auto ack a message if the application code called recover() + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } break; case Session.SESSION_TRANSACTED: _lastDeliveryTag = msg.getDeliveryTag(); |
