diff options
Diffstat (limited to 'qpid/java/client/src')
4 files changed, 40 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 755f2f271b..eb29d9d805 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -86,7 +86,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private int _nextTag = 1; /** This queue is bounded and is used to store messages before being dispatched to the consumer */ - private final FlowControllingBlockingQueue _queue; + public final FlowControllingBlockingQueue _queue; private Dispatcher _dispatcher; @@ -804,16 +804,44 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkNotTransacted(); // throws IllegalStateException if a transacted session // this is set only here, and the before the consumer's onMessage is called it is set to false _inRecovery = true; + + boolean isSuspended = isSuspended(); + + if (!isSuspended) + { + try + { + suspendChannel(true); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } for (BasicMessageConsumer consumer : _consumers.values()) { consumer.clearUnackedMessages(); } + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, (byte) 8, (byte) 0, // AMQP version (major, minor) false)); // requeue + + if (!isSuspended) + { + try + { + suspendChannel(false); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } + } boolean isInRecovery() @@ -836,8 +864,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { consumer.acknowledge(); } - - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index 4087db6562..847454e43e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -100,7 +100,7 @@ public class FlowControllingBlockingQueue { _logger.trace("Object added to queue:" + o); } - + if (_listener != null) { synchronized (_listener) @@ -112,5 +112,10 @@ public class FlowControllingBlockingQueue } } } + + public int size() + { + return _count; + } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index d12ab01bdc..d80d3ad87d 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -79,12 +79,15 @@ public class RecoverTest extends TestCase // no ack for last three messages so when I call recover I expect to get three messages back consumerSession.recover(); tm = (TextMessage) consumer.receive(3000); + assertNotNull("Message was null", tm); assertEquals("msg2", tm.getText()); tm = (TextMessage) consumer.receive(3000); + assertNotNull("Message was null", tm); assertEquals("msg3", tm.getText()); tm = (TextMessage) consumer.receive(3000); + assertNotNull("Message was null", tm); assertEquals("msg4", tm.getText()); _logger.info("Received redelivery of three messages. Acknowledging last message"); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index ce3ea01a09..c5ac530297 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -269,7 +269,7 @@ public class CommitRollbackTest extends TestCase _session.commit(); assertNotNull("test message was consumed and rolled back, but is gone", result); - assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText()); + assertEquals("test message was incorrect message", MESSAGE_TEXT, ((TextMessage) result).getText()); } @@ -297,4 +297,5 @@ public class CommitRollbackTest extends TestCase assertNull("test message should be null", result); } + } |
