summaryrefslogtreecommitdiff
path: root/qpid/java/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java32
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java7
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java3
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java3
4 files changed, 40 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 755f2f271b..eb29d9d805 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -86,7 +86,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private int _nextTag = 1;
/** This queue is bounded and is used to store messages before being dispatched to the consumer */
- private final FlowControllingBlockingQueue _queue;
+ public final FlowControllingBlockingQueue _queue;
private Dispatcher _dispatcher;
@@ -804,16 +804,44 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
checkNotTransacted(); // throws IllegalStateException if a transacted session
// this is set only here, and the before the consumer's onMessage is called it is set to false
_inRecovery = true;
+
+ boolean isSuspended = isSuspended();
+
+ if (!isSuspended)
+ {
+ try
+ {
+ suspendChannel(true);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
for (BasicMessageConsumer consumer : _consumers.values())
{
consumer.clearUnackedMessages();
}
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
(byte) 8, (byte) 0, // AMQP version (major, minor)
false)); // requeue
+
+ if (!isSuspended)
+ {
+ try
+ {
+ suspendChannel(false);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
+
}
boolean isInRecovery()
@@ -836,8 +864,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
consumer.acknowledge();
}
-
-
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
index 4087db6562..847454e43e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
@@ -100,7 +100,7 @@ public class FlowControllingBlockingQueue
{
_logger.trace("Object added to queue:" + o);
}
-
+
if (_listener != null)
{
synchronized (_listener)
@@ -112,5 +112,10 @@ public class FlowControllingBlockingQueue
}
}
}
+
+ public int size()
+ {
+ return _count;
+ }
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index d12ab01bdc..d80d3ad87d 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -79,12 +79,15 @@ public class RecoverTest extends TestCase
// no ack for last three messages so when I call recover I expect to get three messages back
consumerSession.recover();
tm = (TextMessage) consumer.receive(3000);
+ assertNotNull("Message was null", tm);
assertEquals("msg2", tm.getText());
tm = (TextMessage) consumer.receive(3000);
+ assertNotNull("Message was null", tm);
assertEquals("msg3", tm.getText());
tm = (TextMessage) consumer.receive(3000);
+ assertNotNull("Message was null", tm);
assertEquals("msg4", tm.getText());
_logger.info("Received redelivery of three messages. Acknowledging last message");
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index ce3ea01a09..c5ac530297 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -269,7 +269,7 @@ public class CommitRollbackTest extends TestCase
_session.commit();
assertNotNull("test message was consumed and rolled back, but is gone", result);
- assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
+ assertEquals("test message was incorrect message", MESSAGE_TEXT, ((TextMessage) result).getText());
}
@@ -297,4 +297,5 @@ public class CommitRollbackTest extends TestCase
assertNull("test message should be null", result);
}
+
}