summaryrefslogtreecommitdiff
path: root/qpid/java/client/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-08 13:38:37 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-08 13:38:37 +0000
commit208e8a556d64a717c9fec9e49354d6e5c69bbaac (patch)
tree26854ab71db1188b5dad8965520f4c35130e25bd /qpid/java/client/src
parent3b6459da09f50e039ba79fabd5da19c3b2cab1f5 (diff)
downloadqpid-python-208e8a556d64a717c9fec9e49354d6e5c69bbaac.tar.gz
QPID-
Broker AMQChannel - Resend modified to add messages to Subscription resendQueue. BasicRecoverMethodHandler - Now makes use of the Requeue boolean (needs test case, but is same logic as TxRollback) TxRollbackHandler - Removed protocol Session from AMQChannel.resend() AMQMessage - Changes comments, updated taken() to record the subscription that took the message AMQQueue - Added DeliveryManager to Subscription constructors. ConcurrentSelectorDeliveryManager - updated to get queue from Subscription and to know when the Subscriptions have content that needs Async delivery. DeliveryManager - added update method to allow a subscription to tell DM it has content to send. Subscription - new methods to handle resendQueue SubscriptionFactory - changes to pass in the DeliveryManager SubscriptionImpl - Comment changes, Constructor changes, implmentations of interface Client Recover and TxRollback now perform their broker methods while suspended. RecoverTest - Added addition asserts to prevent NPEs CommitRollbackTest - word change RemoteSubscriptionImpl/SubscriptionTestHelper - Subscription implementation AckTest - Update for new SubscriptionImpl constructor git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@504887 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java32
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java7
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java3
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java3
4 files changed, 40 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 755f2f271b..eb29d9d805 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -86,7 +86,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private int _nextTag = 1;
/** This queue is bounded and is used to store messages before being dispatched to the consumer */
- private final FlowControllingBlockingQueue _queue;
+ public final FlowControllingBlockingQueue _queue;
private Dispatcher _dispatcher;
@@ -804,16 +804,44 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
checkNotTransacted(); // throws IllegalStateException if a transacted session
// this is set only here, and the before the consumer's onMessage is called it is set to false
_inRecovery = true;
+
+ boolean isSuspended = isSuspended();
+
+ if (!isSuspended)
+ {
+ try
+ {
+ suspendChannel(true);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
for (BasicMessageConsumer consumer : _consumers.values())
{
consumer.clearUnackedMessages();
}
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
(byte) 8, (byte) 0, // AMQP version (major, minor)
false)); // requeue
+
+ if (!isSuspended)
+ {
+ try
+ {
+ suspendChannel(false);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
+
}
boolean isInRecovery()
@@ -836,8 +864,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
consumer.acknowledge();
}
-
-
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
index 4087db6562..847454e43e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
@@ -100,7 +100,7 @@ public class FlowControllingBlockingQueue
{
_logger.trace("Object added to queue:" + o);
}
-
+
if (_listener != null)
{
synchronized (_listener)
@@ -112,5 +112,10 @@ public class FlowControllingBlockingQueue
}
}
}
+
+ public int size()
+ {
+ return _count;
+ }
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index d12ab01bdc..d80d3ad87d 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -79,12 +79,15 @@ public class RecoverTest extends TestCase
// no ack for last three messages so when I call recover I expect to get three messages back
consumerSession.recover();
tm = (TextMessage) consumer.receive(3000);
+ assertNotNull("Message was null", tm);
assertEquals("msg2", tm.getText());
tm = (TextMessage) consumer.receive(3000);
+ assertNotNull("Message was null", tm);
assertEquals("msg3", tm.getText());
tm = (TextMessage) consumer.receive(3000);
+ assertNotNull("Message was null", tm);
assertEquals("msg4", tm.getText());
_logger.info("Received redelivery of three messages. Acknowledging last message");
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index ce3ea01a09..c5ac530297 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -269,7 +269,7 @@ public class CommitRollbackTest extends TestCase
_session.commit();
assertNotNull("test message was consumed and rolled back, but is gone", result);
- assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
+ assertEquals("test message was incorrect message", MESSAGE_TEXT, ((TextMessage) result).getText());
}
@@ -297,4 +297,5 @@ public class CommitRollbackTest extends TestCase
assertNull("test message should be null", result);
}
+
}