diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-02-08 13:38:37 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-08 13:38:37 +0000 |
| commit | 208e8a556d64a717c9fec9e49354d6e5c69bbaac (patch) | |
| tree | 26854ab71db1188b5dad8965520f4c35130e25bd /qpid/java/client/src | |
| parent | 3b6459da09f50e039ba79fabd5da19c3b2cab1f5 (diff) | |
| download | qpid-python-208e8a556d64a717c9fec9e49354d6e5c69bbaac.tar.gz | |
QPID-
Broker
AMQChannel - Resend modified to add messages to Subscription resendQueue.
BasicRecoverMethodHandler - Now makes use of the Requeue boolean (needs test case, but is same logic as TxRollback)
TxRollbackHandler - Removed protocol Session from AMQChannel.resend()
AMQMessage - Changes comments, updated taken() to record the subscription that took the message
AMQQueue - Added DeliveryManager to Subscription constructors.
ConcurrentSelectorDeliveryManager - updated to get queue from Subscription and to know when the Subscriptions have content that needs Async delivery.
DeliveryManager - added update method to allow a subscription to tell DM it has content to send.
Subscription - new methods to handle resendQueue
SubscriptionFactory - changes to pass in the DeliveryManager
SubscriptionImpl - Comment changes, Constructor changes, implmentations of interface
Client
Recover and TxRollback now perform their broker methods while suspended.
RecoverTest - Added addition asserts to prevent NPEs
CommitRollbackTest - word change
RemoteSubscriptionImpl/SubscriptionTestHelper - Subscription implementation
AckTest - Update for new SubscriptionImpl constructor
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@504887 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
4 files changed, 40 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 755f2f271b..eb29d9d805 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -86,7 +86,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private int _nextTag = 1; /** This queue is bounded and is used to store messages before being dispatched to the consumer */ - private final FlowControllingBlockingQueue _queue; + public final FlowControllingBlockingQueue _queue; private Dispatcher _dispatcher; @@ -804,16 +804,44 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkNotTransacted(); // throws IllegalStateException if a transacted session // this is set only here, and the before the consumer's onMessage is called it is set to false _inRecovery = true; + + boolean isSuspended = isSuspended(); + + if (!isSuspended) + { + try + { + suspendChannel(true); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } for (BasicMessageConsumer consumer : _consumers.values()) { consumer.clearUnackedMessages(); } + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, (byte) 8, (byte) 0, // AMQP version (major, minor) false)); // requeue + + if (!isSuspended) + { + try + { + suspendChannel(false); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } + } boolean isInRecovery() @@ -836,8 +864,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { consumer.acknowledge(); } - - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index 4087db6562..847454e43e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -100,7 +100,7 @@ public class FlowControllingBlockingQueue { _logger.trace("Object added to queue:" + o); } - + if (_listener != null) { synchronized (_listener) @@ -112,5 +112,10 @@ public class FlowControllingBlockingQueue } } } + + public int size() + { + return _count; + } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index d12ab01bdc..d80d3ad87d 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -79,12 +79,15 @@ public class RecoverTest extends TestCase // no ack for last three messages so when I call recover I expect to get three messages back consumerSession.recover(); tm = (TextMessage) consumer.receive(3000); + assertNotNull("Message was null", tm); assertEquals("msg2", tm.getText()); tm = (TextMessage) consumer.receive(3000); + assertNotNull("Message was null", tm); assertEquals("msg3", tm.getText()); tm = (TextMessage) consumer.receive(3000); + assertNotNull("Message was null", tm); assertEquals("msg4", tm.getText()); _logger.info("Received redelivery of three messages. Acknowledging last message"); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index ce3ea01a09..c5ac530297 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -269,7 +269,7 @@ public class CommitRollbackTest extends TestCase _session.commit(); assertNotNull("test message was consumed and rolled back, but is gone", result); - assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText()); + assertEquals("test message was incorrect message", MESSAGE_TEXT, ((TextMessage) result).getText()); } @@ -297,4 +297,5 @@ public class CommitRollbackTest extends TestCase assertNull("test message should be null", result); } + } |
